From ad1cf71b51fdfbe2869a63ca4e8d9a7d4bd3e4e9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 22 Jun 2026 13:53:39 +0800 Subject: [PATCH] Fix pipe tree database creation on receiver (#17991) (cherry picked from commit 28c4e68a6c40fb8210eab994f50c647d40ba17a3) --- .../thrift/IoTDBDataNodeReceiver.java | 107 ++++++++++++++++++ .../plan/planner/TreeModelPlanner.java | 7 +- .../scheduler/load/LoadTsFileScheduler.java | 26 ++++- .../thrift/IoTDBDataNodeReceiverTest.java | 19 ++++ .../load/LoadTsFileSchedulerTest.java | 36 +++++- 5 files changed, 189 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 23f537fa2aea8..932747f761bdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -22,6 +22,8 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.config.PipeConfig; @@ -74,7 +76,9 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; @@ -97,6 +101,7 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,11 +118,15 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause; + public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeReceiver.class); @@ -152,6 +161,14 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); private PipeMemoryBlock allocatedMemoryBlock; + private final Set autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet(); + private final Set conflictedTreeDatabases = ConcurrentHashMap.newKeySet(); + + private enum TreeDatabaseCreationResult { + SKIPPED, + CREATED_OR_EXISTED, + CONFLICTED + } static { try { @@ -876,6 +893,11 @@ private TSStatus executeStatement(final Statement statement) { return RpcUtils.getStatus(status.getCode(), status.getMessage()); } + if (autoCreateTreeDatabaseIfNecessary(getTreeDatabaseName(statement)) + == TreeDatabaseCreationResult.CONFLICTED) { + clearTreeDatabaseName(statement); + } + return Coordinator.getInstance() .executeForTreeModel( shouldMarkAsPipeRequest.get() ? new PipeEnrichedStatement(statement) : statement, @@ -889,6 +911,91 @@ private TSStatus executeStatement(final Statement statement) { .status; } + private TreeDatabaseCreationResult autoCreateTreeDatabaseIfNecessary(final String database) { + if (database == null + || LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null + || !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { + return TreeDatabaseCreationResult.SKIPPED; + } + if (autoCreatedTreeDatabases.contains(database)) { + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (conflictedTreeDatabases.contains(database)) { + return TreeDatabaseCreationResult.CONFLICTED; + } + + try { + final DatabaseSchemaStatement statement = + new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); + statement.setDatabasePath(new PartialPath(database)); + statement.setEnablePrintExceptionLog(false); + + final TSStatus permissionStatus = statement.checkPermissionBeforeProcess(username); + if (permissionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(permissionStatus.getMessage()); + } + + final DatabaseSchemaTask task = new DatabaseSchemaTask(statement); + final ListenableFuture future = + task.execute(ClusterConfigTaskExecutor.getInstance()); + final ConfigTaskResult result = future.get(); + final int statusCode = result.getStatusCode().getStatusCode(); + if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || statusCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + autoCreatedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (statusCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + conflictedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CONFLICTED; + } + throw new PipeException( + String.format( + "Auto create tree database failed: %s, status: %s", + database, result.getStatus() == null ? result.getStatusCode() : result.getStatus())); + } catch (final IllegalPathException e) { + throw new PipeException(String.format("Illegal tree database %s.", database), e); + } catch (final ExecutionException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + final Throwable rootCause = getRootCause(e); + final int errorCode; + if (rootCause instanceof IoTDBException) { + errorCode = ((IoTDBException) rootCause).getErrorCode(); + } else if (rootCause instanceof IoTDBRuntimeException) { + errorCode = ((IoTDBRuntimeException) rootCause).getErrorCode(); + } else { + errorCode = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(); + } + if (errorCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + autoCreatedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CREATED_OR_EXISTED; + } + if (errorCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + conflictedTreeDatabases.add(database); + return TreeDatabaseCreationResult.CONFLICTED; + } + throw new PipeException("Auto create tree database failed because " + e.getMessage(), e); + } + } + + private String getTreeDatabaseName(final Statement statement) { + if (statement instanceof LoadTsFileStatement) { + return ((LoadTsFileStatement) statement).getDatabase(); + } + return null; + } + + static void clearTreeDatabaseName(final Statement statement) { + if (statement instanceof LoadTsFileStatement) { + final LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) statement; + loadTsFileStatement.setDatabase(null); + loadTsFileStatement.setDatabaseLevel( + IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel()); + } + } + @Override protected TSStatus login() { final IClientSession session = SESSION_MANAGER.getCurrSession(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index 2812d81cf27fc..d7af3b8fa3d4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -112,6 +112,10 @@ public IScheduler doSchedule( && ((PipeEnrichedStatement) statement).getInnerStatement() instanceof LoadTsFileStatement; if (statement instanceof LoadTsFileStatement || isPipeEnrichedTsFileLoad) { + final LoadTsFileStatement loadTsFileStatement = + statement instanceof LoadTsFileStatement + ? (LoadTsFileStatement) statement + : (LoadTsFileStatement) ((PipeEnrichedStatement) statement).getInnerStatement(); scheduler = new LoadTsFileScheduler( distributedPlan, @@ -119,7 +123,8 @@ public IScheduler doSchedule( stateMachine, syncInternalServiceClientManager, partitionFetcher, - isPipeEnrichedTsFileLoad); + isPipeEnrichedTsFileLoad, + loadTsFileStatement.getDatabase()); } else { scheduler = new ClusterScheduler( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index e61367ed896a0..46dd02d0aba1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -82,6 +82,7 @@ import java.io.DataOutputStream; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -136,6 +137,7 @@ public class LoadTsFileScheduler implements IScheduler { private final PlanFragmentId fragmentId; private final Set allReplicaSets; private final boolean isGeneratedByPipe; + private final String treeDatabaseForRetry; private final Map timePartitionSlotToProgressIndex; private final LoadTsFileDataCacheMemoryBlock block; @@ -145,7 +147,8 @@ public LoadTsFileScheduler( QueryStateMachine stateMachine, IClientManager internalServiceClientManager, IPartitionFetcher partitionFetcher, - boolean isGeneratedByPipe) { + boolean isGeneratedByPipe, + String treeDatabaseForRetry) { this.queryContext = queryContext; this.stateMachine = stateMachine; this.tsFileNodeList = new ArrayList<>(); @@ -155,6 +158,7 @@ public LoadTsFileScheduler( this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher); this.allReplicaSets = new HashSet<>(); this.isGeneratedByPipe = isGeneratedByPipe; + this.treeDatabaseForRetry = treeDatabaseForRetry; this.timePartitionSlotToProgressIndex = new HashMap<>(); this.block = LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock(); @@ -551,9 +555,7 @@ private void convertFailedTsFilesToTabletsAndRetry() { final TSStatus status = loadTsFileDataTypeConverter .convertForTreeModel( - LoadTsFileStatement.createUnchecked(filePath) - .setDeleteAfterLoad(failedNode.isDeleteAfterLoad()) - .setConvertOnTypeMismatch(true)) + buildRetryTreeLoadStatement(filePath, failedNode.isDeleteAfterLoad())) .orElse(null); if (loadTsFileDataTypeConverter.isSuccessful(status)) { @@ -592,6 +594,22 @@ private void convertFailedTsFilesToTabletsAndRetry() { } } + private LoadTsFileStatement buildRetryTreeLoadStatement( + final String filePath, final boolean deleteAfterLoad) throws FileNotFoundException { + final LoadTsFileStatement statement = + LoadTsFileStatement.createUnchecked(filePath) + .setDeleteAfterLoad(deleteAfterLoad) + .setConvertOnTypeMismatch(true); + if (treeDatabaseForRetry != null) { + statement.setDatabase(treeDatabaseForRetry); + statement.updateDatabaseLevelByTreeDatabase(); + } + if (isGeneratedByPipe) { + statement.markIsGeneratedByPipe(); + } + return statement; + } + @Override public void stop(Throwable t) { // Do nothing diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java index 864d049397949..af0fa57e0745a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java @@ -152,4 +152,23 @@ public void testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType( Files.deleteIfExists(tsFile); } } + + @Test + public void testClearTreeDatabaseNameForLoadTsFileStatement() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-clear-tree-database", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), true, true); + + IoTDBDataNodeReceiver.clearTreeDatabaseName(statement); + + Assert.assertNull(statement.getDatabase()); + Assert.assertEquals( + IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(), + statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java index 161a95b4ec9de..0add75e299048 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.junit.Assert; import org.junit.Before; @@ -34,6 +35,10 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import java.io.File; +import java.lang.reflect.Method; +import java.util.Collections; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -50,6 +55,7 @@ public void before() { when(distributedQueryPlan.getRootSubPlan()).thenReturn(subPlan); when(subPlan.getPlanFragment()).thenReturn(planFragment); when(planFragment.getId()).thenReturn(new PlanFragmentId("test", 0)); + when(distributedQueryPlan.getInstances()).thenReturn(Collections.emptyList()); } @Test @@ -62,9 +68,37 @@ public void tt() { mock(QueryStateMachine.class), mock(IClientManager.class), mock(IPartitionFetcher.class), - false)); + false, + null)); t.start(); Assert.assertNull(t.getTotalCpuTime()); Assert.assertNull(t.getFragmentInfo()); } + + @Test + public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws Exception { + final LoadTsFileScheduler scheduler = + new LoadTsFileScheduler( + distributedQueryPlan, + mock(MPPQueryContext.class), + mock(QueryStateMachine.class), + mock(IClientManager.class), + mock(IPartitionFetcher.class), + true, + "root.test.sg_0"); + final Method method = + LoadTsFileScheduler.class.getDeclaredMethod( + "buildRetryTreeLoadStatement", String.class, boolean.class); + method.setAccessible(true); + + final File tsFile = File.createTempFile("test", ".tsfile"); + tsFile.deleteOnExit(); + + final LoadTsFileStatement statement = + (LoadTsFileStatement) method.invoke(scheduler, tsFile.getAbsolutePath(), true); + + Assert.assertEquals("root.test.sg_0", statement.getDatabase()); + Assert.assertEquals(2, statement.getDatabaseLevel()); + Assert.assertTrue(statement.isGeneratedByPipe()); + } }