diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 5e418072a7d97..4caef4cc1f9f7 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -143,4 +143,10 @@ public DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow) { setProperty("query_cost_stat_window", String.valueOf(queryCostStatWindow)); return this; } + + @Override + public DataNodeConfig setDnDataDirs(String dnDataDirs) { + setProperty("dn_data_dirs", dnDataDirs); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index dac6cf3fcc3cc..d205044bd5290 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java @@ -44,7 +44,6 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_PROPERTIES; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_CONNECTION_TIMEOUT_MS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_CONSENSUS_DIR; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_DIRS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_REGION_CONSENSUS_PORT; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_JOIN_CLUSTER_RETRY_INTERVAL_MS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_METRIC_INTERNAL_REPORTER_TYPE; @@ -125,7 +124,6 @@ public DataNodeWrapper( immutableNodeProperties.setProperty(IoTDBConstant.DN_SEED_CONFIG_NODE, seedConfigNode); immutableNodeProperties.setProperty(DN_SYSTEM_DIR, MppBaseConfig.NULL_VALUE); - immutableNodeProperties.setProperty(DN_DATA_DIRS, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_CONSENSUS_DIR, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_WAL_DIRS, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_TRACING_DIR, MppBaseConfig.NULL_VALUE); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index bba4c964f9578..aa73d962dbab1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -98,4 +98,9 @@ public DataNodeConfig setDataNodeMemoryProportion(String dataNodeMemoryProportio public DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow) { return this; } + + @Override + public DataNodeConfig setDnDataDirs(String dnDataDirs) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index d57015b13964e..01a6114c20679 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -53,4 +53,6 @@ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( DataNodeConfig setDataNodeMemoryProportion(String dataNodeMemoryProportion); DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow); + + DataNodeConfig setDnDataDirs(String dnDataDirs); } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java new file mode 100644 index 0000000000000..43b2d1bc992a9 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes; +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBRegionMigrateWithDeletionMultiDataDirIT { + + private static final String MULTI_DATA_DIRS = + "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2"; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(2) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + EnvFactory.getEnv().getConfig().getDataNodeConfig().setDnDataDirs(MULTI_DATA_DIRS); + EnvFactory.getEnv().initClusterEnvironment(1, 3); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Exception { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE test"); + statement.execute("USE test"); + statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)"); + statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100), (200, 200), (300, 300)"); + statement.execute("FLUSH"); + statement.execute("DELETE FROM t1 WHERE time <= 200"); + statement.execute("FLUSH"); + + Map>> dataRegionMapWithLeader = + getDataRegionMapWithLeader(statement); + int dataRegionIdForTest = + dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow(); + assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1); + + Pair> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest); + Set allDataNodes = getAllDataNodes(statement); + int leaderId = leaderAndNodes.getLeft(); + int followerId = + leaderAndNodes.getRight().stream().filter(id -> id != leaderId).findFirst().orElseThrow(); + int destDataNodeId = + allDataNodes.stream() + .filter(id -> id != leaderId && id != followerId) + .findFirst() + .orElseThrow(); + + statement.execute( + String.format( + "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId)); + + final int finalDestDataNodeId = destDataNodeId; + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + boolean migrated = false; + while (showRegions.next()) { + if (showRegions.getInt("RegionId") == dataRegionIdForTest + && showRegions.getInt("DataNodeId") == finalDestDataNodeId) { + migrated = true; + break; + } + } + Assert.assertTrue(migrated); + } + }); + + assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1); + } + } + + private void assertDeletionVisibleOnAllReplicas( + Statement statement, int dataRegionId, int expectedCount) throws Exception { + Set replicaDataNodeIds = getReplicaDataNodeIds(statement, dataRegionId); + for (int dataNodeId : replicaDataNodeIds) { + DataNodeWrapper dataNodeWrapper = + EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).orElseThrow(); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollDelay(500, TimeUnit.MILLISECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> assertDeletionVisibleOnReplica(dataNodeWrapper, expectedCount)); + } + } + + private void assertDeletionVisibleOnReplica(DataNodeWrapper dataNodeWrapper, int expectedCount) + throws Exception { + try (Connection connection = + EnvFactory.getEnv() + .getConnection( + dataNodeWrapper, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TABLE_SQL_DIALECT); + Statement dataNodeStatement = connection.createStatement()) { + dataNodeStatement.execute("USE test"); + try (ResultSet countResultSet = dataNodeStatement.executeQuery("SELECT COUNT(s1) FROM t1")) { + Assert.assertTrue(countResultSet.next()); + Assert.assertEquals(expectedCount, countResultSet.getLong(1)); + } + try (ResultSet deletedRangeResultSet = + dataNodeStatement.executeQuery("SELECT s1 FROM t1 WHERE time <= 200")) { + Assert.assertFalse(deletedRangeResultSet.next()); + } + } + } + + private Set getReplicaDataNodeIds(Statement statement, int dataRegionId) + throws Exception { + Set replicaDataNodeIds = new HashSet<>(); + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + while (showRegions.next()) { + if ("DataRegion".equals(showRegions.getString("Type")) + && showRegions.getInt("RegionId") == dataRegionId) { + replicaDataNodeIds.add(showRegions.getInt("DataNodeId")); + } + } + } + Assert.assertFalse(replicaDataNodeIds.isEmpty()); + return replicaDataNodeIds; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java index 17136cd24fbb4..588b0e21956f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java @@ -146,9 +146,14 @@ private DataRegion loadSnapshotFromMultipleDirs() { try { deleteAllFilesInDataDirs(); LOGGER.info(StorageEngineMessages.REMOVE_ALL_DATA_FILES_IN_ORIGINAL_DIR); + // IoTConsensus may spread the fragments of one snapshot across several receive folders. + // The fileTarget map must be shared across all of them so that a tsfile and its companion + // files (resource, exclusive mods, etc.) are relinked to the same data dir even when their + // fragments were received on different disks. + Map fileTarget = new HashMap<>(); for (String path : snapshotPaths) { File snapshotDir = new File(path); - createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir); + createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir, fileTarget); loadCompressionRatio(snapshotDir); } return loadSnapshot(); @@ -170,7 +175,7 @@ private DataRegion loadSnapshotWithoutLog() { } LOGGER.info(StorageEngineMessages.MOVING_SNAPSHOT_FILE_TO_DATA_DIRS); File snapshotDir = new File(snapshotPath); - createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir); + createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir, new HashMap<>()); loadCompressionRatio(snapshotDir); return loadSnapshot(); } catch (IOException | DiskSpaceInsufficientException e) { @@ -294,7 +299,8 @@ private void deleteAllFilesInDataDirs() throws IOException { } } - private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir) + private void createLinksFromSnapshotDirToDataDirWithoutLog( + File sourceDir, Map fileTarget) throws IOException, DiskSpaceInsufficientException { if (!sourceDir.exists()) { throw new IOException( @@ -340,7 +346,7 @@ private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir) + dataRegionId + File.separator + timePartitionFolder.getName(); - createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager); + createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager, fileTarget); } } @@ -359,7 +365,7 @@ private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir) + dataRegionId + File.separator + timePartitionFolder.getName(); - createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager); + createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager, fileTarget); } } } @@ -406,8 +412,11 @@ private File createLinksFromSnapshotToSourceDir( } private void createLinksFromSnapshotToSourceDir( - String targetSuffix, File[] files, FolderManager folderManager) throws IOException { - Map fileTarget = new HashMap<>(); + String targetSuffix, + File[] files, + FolderManager folderManager, + Map fileTarget) + throws IOException { for (File file : files) { checkTsFileResourceExists(file); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java index d042d2a2ae8fa..ba7c9310945aa 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java @@ -52,7 +52,9 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.iotdb.consensus.iot.IoTConsensusServerImpl.SNAPSHOT_DIR_NAME; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; @@ -258,6 +260,51 @@ public void testLoadSnapshotSpreadAcrossReceiveFolders() loadSnapshotSpreadAcrossReceiveFolders(false); } + /** + * When IoTConsensus spreads a tsfile and its exclusive mods across different receive folders, the + * loader must still relink them to the same data dir. Otherwise the mods file is not found next + * to the tsfile and deletion markers are silently ignored. + */ + @Test + public void testLoadSnapshotKeepsTsFileAndModsOnSameDataDirWhenFragmentsAreSpread() + throws IOException, WriteProcessException { + String[][] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs(); + IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs); + TierManager.getInstance().resetFolders(); + String recvBase0 = "target" + File.separator + "recv-snapshot-mods-0"; + String recvBase1 = "target" + File.separator + "recv-snapshot-mods-1"; + File recvFolder0 = new File(recvBase0, SNAPSHOT_DIR_NAME); + File recvFolder1 = new File(recvBase1, SNAPSHOT_DIR_NAME); + try { + Assert.assertTrue(recvFolder0.mkdirs()); + Assert.assertTrue(recvFolder1.mkdirs()); + + writeSnapshotFragmentWithExclusiveModsSpread(recvFolder0.getAbsolutePath(), 0, recvFolder1); + + DataRegion dataRegion = + new SnapshotLoader( + Arrays.asList(recvFolder0.getAbsolutePath(), recvFolder1.getAbsolutePath()), + testSgName, + "0") + .loadSnapshotForStateMachine(); + + Assert.assertNotNull(dataRegion); + TsFileResource resource = dataRegion.getTsFileManager().getTsFileList(true).get(0); + File tsFile = resource.getTsFile(); + File modsFile = + org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile + .getExclusiveMods(tsFile); + Assert.assertTrue(modsFile.exists()); + Assert.assertEquals( + tsFile.getParentFile().getAbsolutePath(), modsFile.getParentFile().getAbsolutePath()); + } finally { + FileUtils.recursivelyDeleteFolder(recvBase0); + FileUtils.recursivelyDeleteFolder(recvBase1); + IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs); + TierManager.getInstance().resetFolders(); + } + } + /** * The fragments of one snapshot are disjoint across the receive folders, so the order in which * the folders are relinked must not change the loaded data. This loads the same spread snapshot @@ -343,6 +390,41 @@ private void writeSnapshotFragment(String recvSnapshotDir, int i) resource.serialize(); } + private void writeSnapshotFragmentWithExclusiveModsSpread( + String tsFileRecvSnapshotDir, int i, File modsRecvFolder) + throws IOException, WriteProcessException { + writeSnapshotFragment(tsFileRecvSnapshotDir, i); + String tsFileName = String.format("%d-%d-0-0.tsfile", i + 1, i + 1); + File tsFile = + new File( + tsFileRecvSnapshotDir + + File.separator + + "sequence" + + File.separator + + testSgName + + File.separator + + "0" + + File.separator + + "0" + + File.separator + + tsFileName); + File sourceMods = + org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile.getExclusiveMods( + tsFile); + Assert.assertTrue(sourceMods.exists() || sourceMods.createNewFile()); + + File targetModsDir = + new File( + modsRecvFolder, + "sequence" + File.separator + testSgName + File.separator + "0" + File.separator + "0"); + Assert.assertTrue(targetModsDir.exists() || targetModsDir.mkdirs()); + Files.copy( + sourceMods.toPath(), + new File(targetModsDir, sourceMods.getName()).toPath(), + java.nio.file.StandardCopyOption.REPLACE_EXISTING); + Files.delete(sourceMods.toPath()); + } + @Ignore("Need manual execution to specify different disks") @Test public void testLoadSnapshotNoHardLink() @@ -511,12 +593,23 @@ public void testFileTargetRecordedWhenHardLinkSuccess() throws Exception { Method method = SnapshotLoader.class.getDeclaredMethod( - "createLinksFromSnapshotToSourceDir", String.class, File[].class, FolderManager.class); + "createLinksFromSnapshotToSourceDir", + String.class, + File[].class, + FolderManager.class, + Map.class); method.setAccessible(true); SnapshotLoader loader = new SnapshotLoader("dummy", "root.testsg", "0"); - method.invoke(loader, targetSuffix, files, folderManager); + // Tracks fileKey -> chosen data dir, so files sharing a fileKey land in the same dir. + Map fileTarget = new HashMap<>(); + method.invoke(loader, targetSuffix, files, folderManager, fileTarget); + + // The shared fileKey must be recorded exactly once, pointing at one of the data dirs. + String fileKey = tsFile.getName().split("\\.")[0]; + Assert.assertEquals(1, fileTarget.size()); + Assert.assertTrue(Arrays.asList(dataDirs).contains(fileTarget.get(fileKey))); // verify: only ONE dir contains all three files int hitDirCount = 0; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java index 285b8b1d5b07a..6a893d47aa1a2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import java.io.IOException; +import java.io.UncheckedIOException; /** * Selects the folder with the least occupied space. @@ -118,7 +119,7 @@ private void refreshOccupiedSpace() { } try { cachedOccupiedSpace[i] = JVMCommonUtils.getOccupiedSpace(folder); - } catch (IOException e) { + } catch (IOException | UncheckedIOException e) { LOGGER.error(UtilMessages.CANNOT_CALCULATE_OCCUPIED_SPACE, folder, e); cachedOccupiedSpace[i] = Long.MAX_VALUE; }