diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java index 5589fdd37994b..605bd65c8a0cd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; @@ -163,8 +164,16 @@ private static Optional loadProcedure(Path procedureFilePath) { try (FileInputStream fis = new FileInputStream(procedureFilePath.toFile())) { Procedure procedure = null; try (FileChannel channel = fis.getChannel()) { - ByteBuffer byteBuffer = ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE); - if (channel.read(byteBuffer) > 0) { + final long fileSize = channel.size(); + if (fileSize > PROCEDURE_LOAD_BUFFER_SIZE) { + throw new IOException( + String.format( + "Procedure file %s exceeds the load buffer limit %s, actual size %s", + procedureFilePath, PROCEDURE_LOAD_BUFFER_SIZE, fileSize)); + } + ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileSize); + if (fileSize > 0) { + IOUtils.readFully(channel, byteBuffer); byteBuffer.flip(); procedure = ProcedureFactory.getInstance().create(byteBuffer); byteBuffer.clear(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java index f67cfdc286a70..8e8df07cf367c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java @@ -75,10 +75,7 @@ public boolean hasNext() { } buffer = new byte[logSize]; - int readLen = logStream.read(buffer, 0, logSize); - if (readLen < logSize) { - throw new IOException("Reach eof"); - } + logStream.readFully(buffer, 0, logSize); final long checkSum = logStream.readLong(); checkSummer.reset(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java index c6b7cee0f3afb..3870975fa1e4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.transformation.datastructure; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.TemporaryQueryDataFileService; @@ -60,7 +61,7 @@ default void deserialize() throws IOException { } init(); ByteBuffer byteBuffer = ByteBuffer.allocate(recorder.getSerializedByteLength()); - recorder.getFileChannel().read(byteBuffer); + IOUtils.readFully(recorder.getFileChannel(), byteBuffer); byteBuffer.flip(); deserialize(byteBuffer); recorder.closeFile(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java index 46ce88825ecfb..9ecca265a9d4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java @@ -27,7 +27,6 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -67,9 +66,7 @@ public T deserialize(InputStream inputStream) throws IOException { } byte[] logBuffer = new byte[logLength]; - if (logLength < inputStream.read(logBuffer, 0, logLength)) { - throw new EOFException(); - } + dataInputStream.readFully(logBuffer, 0, logLength); T result = deserializer.deserialize(ByteBuffer.wrap(logBuffer)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java index 692736474bad1..1501f50acc1bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode; import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.ConsensusFactory; @@ -360,7 +361,7 @@ private void initFileHeader() throws IOException, MetadataException { lastSGAddr = 0L; pageManager = new BTreePageManager(channel, pmtFile, -1, logPath); } else { - channel.read(headerContent); + IOUtils.readFully(channel, headerContent); headerContent.clear(); lastPageIndex = ReadWriteIOUtils.readInt(headerContent); dataTTL = ReadWriteIOUtils.readLong(headerContent); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java index c723e38dfaf00..227ea27f5056e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -91,8 +93,9 @@ public List collectUpdatedEntries() throws IOException, SchemaFileLogCor } } - // corrupted within one entry - if (inputStream.read(tempBytes, 1, tempBytes.length - 1) < tempBytes.length - 2) { + try { + new DataInputStream(inputStream).readFully(tempBytes, 1, tempBytes.length - 1); + } catch (EOFException e) { throw new SchemaFileLogCorruptedException(logFile.getAbsolutePath(), "incomplete entry."); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java index 044b7339a82e8..4e9d6524f1996 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader; @@ -107,7 +108,7 @@ public void loadFromFileToBuffer(ByteBuffer dst, int pageIndex) throws IOExcepti if (!readChannel.isOpen()) { readChannel = FileChannel.open(pmtFile.toPath(), StandardOpenOption.READ); } - readChannel.read(dst, getPageAddress(pageIndex)); + IOUtils.readFully(readChannel, dst, getPageAddress(pageIndex)); } // region Flush Strategy diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java index 9cf9d51cf2119..512c9fcea576f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.commons.io.FileUtils; @@ -114,7 +115,7 @@ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position) throws IOException { // Read the first block ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH); - fileChannel.read(byteBuffer, position); + IOUtils.readFully(fileChannel, byteBuffer, position); byteBuffer.flip(); if (byteBuffer.limit() > 0) { // This indicates that there is data at this position int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int @@ -129,7 +130,7 @@ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position) // read one offset, then use filechannel's read to read it byteBuffers.position(MAX_LENGTH * i); byteBuffers.limit(MAX_LENGTH * (i + 1)); - fileChannel.read(byteBuffers, nextPosition); + IOUtils.readFully(fileChannel, byteBuffers, nextPosition); byteBuffers.position(4 + i * Long.BYTES); } byteBuffers.limit(byteBuffers.capacity()); @@ -144,7 +145,10 @@ private List parseOffsetList(long position) throws IOException { blockOffset.add(position); // Read the first block ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH); - fileChannel.read(byteBuffer, position); + if (position == fileChannel.size()) { + return blockOffset; + } + IOUtils.readFully(fileChannel, byteBuffer, position); byteBuffer.flip(); if (byteBuffer.limit() > 0) { // This indicates that there is data at this position int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int @@ -167,7 +171,7 @@ private List parseOffsetList(long position) throws IOException { // read blockBuffer.position(MAX_LENGTH * i); blockBuffer.limit(MAX_LENGTH * (i + 1)); - fileChannel.read(blockBuffer, blockOffset.get(i)); + IOUtils.readFully(fileChannel, blockBuffer, blockOffset.get(i)); blockBuffer.position(4 + i * Long.BYTES); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java index e3d374551b115..8a6d2fbf890c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.commons.utils.IOUtils; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -63,7 +65,7 @@ public static WALFileVersion getVersion(FileChannel channel) throws IOException continue; } ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length); - channel.read(buffer); + IOUtils.readFully(channel, buffer); buffer.flip(); String versionString = new String(buffer.array(), StandardCharsets.UTF_8); if (version.versionString.equals(versionString)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index 1827bfc9365ea..a56c145fe2b58 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.utils.MmapUtil; @@ -85,7 +86,8 @@ private void getEndOffset() throws IOException { if (version == WALFileVersion.V2) { // New Version ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length); - channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length); + IOUtils.readFully( + channel, magicStringBuffer, channel.size() - version.getVersionBytes().length); magicStringBuffer.flip(); if (logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX) || !new String(magicStringBuffer.array(), StandardCharsets.UTF_8) @@ -105,7 +107,8 @@ private void getEndOffset() throws IOException { } // Old version ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length); - channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length); + IOUtils.readFully( + channel, magicStringBuffer, channel.size() - version.getVersionBytes().length); magicStringBuffer.flip(); if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8) .equals(version.getVersionString())) { @@ -117,7 +120,7 @@ private void getEndOffset() throws IOException { } } // Read the metadata size - channel.read(metadataSizeBuf, position); + IOUtils.readFully(channel, metadataSizeBuf, position); metadataSizeBuf.flip(); int metadataSize = metadataSizeBuf.getInt(); endOffset = channel.size() - version.getVersionBytes().length - Integer.BYTES - metadataSize; @@ -237,9 +240,7 @@ private void loadNextSegmentV2() throws IOException { compressedBuffer.clear(); // limit the buffer to prevent it from reading too much byte than expected compressedBuffer.limit(segmentInfo.dataInDiskSize); - if (readWALBufferFromChannel(compressedBuffer) != segmentInfo.dataInDiskSize) { - throw new IOException("Unexpected end of file"); - } + readWALBufferFullyFromChannel(compressedBuffer); compressedBuffer.flip(); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor); @@ -255,9 +256,7 @@ private void loadNextSegmentV2() throws IOException { // limit the buffer to prevent it from reading too much byte than expected dataBuffer.limit(segmentInfo.dataInDiskSize); - if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) { - throw new IOException("Unexpected end of file"); - } + readWALBufferFullyFromChannel(dataBuffer); } dataBuffer.flip(); } @@ -301,7 +300,7 @@ public void skipToGivenLogicalPosition(long pos) throws IOException { if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) { compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); - readWALBufferFromChannel(compressedBuffer); + readWALBufferFullyFromChannel(compressedBuffer); compressedBuffer.flip(); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); @@ -310,7 +309,7 @@ public void skipToGivenLogicalPosition(long pos) throws IOException { compressedBuffer = null; } else { dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); - readWALBufferFromChannel(dataBuffer); + readWALBufferFullyFromChannel(dataBuffer); dataBuffer.flip(); } @@ -349,7 +348,7 @@ public WALMetaData getWALMetaData() throws IOException { private SegmentInfo getNextSegmentInfo() throws IOException { segmentHeaderWithoutCompressedSizeBuffer.clear(); - channel.read(segmentHeaderWithoutCompressedSizeBuffer); + readWALBufferFullyFromChannel(segmentHeaderWithoutCompressedSizeBuffer); segmentHeaderWithoutCompressedSizeBuffer.flip(); SegmentInfo info = new SegmentInfo(); info.compressionType = @@ -357,7 +356,7 @@ private SegmentInfo getNextSegmentInfo() throws IOException { info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt(); if (info.compressionType != CompressionType.UNCOMPRESSED) { compressedSizeBuffer.clear(); - readWALBufferFromChannel(compressedSizeBuffer); + readWALBufferFullyFromChannel(compressedSizeBuffer); compressedSizeBuffer.flip(); info.uncompressedSize = compressedSizeBuffer.getInt(); } else { @@ -373,6 +372,13 @@ private int readWALBufferFromChannel(ByteBuffer buffer) throws IOException { return size; } + private void readWALBufferFullyFromChannel(ByteBuffer buffer) throws IOException { + long startTime = System.nanoTime(); + int size = buffer.remaining(); + IOUtils.readFully(channel, buffer); + WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() - startTime); + } + private void uncompressWALBuffer( ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor unCompressor) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java index ba9211656ef03..9c707c788a33b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.consensus.iot.log.ConsensusReqReader; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException; import org.apache.iotdb.db.utils.SerializedSize; @@ -143,12 +144,12 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES); WALFileVersion version = WALFileVersion.getVersion(channel); position = channel.size() - Integer.BYTES - (version.getVersionBytes().length); - channel.read(metadataSizeBuf, position); + IOUtils.readFully(channel, metadataSizeBuf, position); metadataSizeBuf.flip(); // load metadata int metadataSize = metadataSizeBuf.getInt(); ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize); - channel.read(metadataBuf, position - metadataSize); + IOUtils.readFully(channel, metadataBuf, position - metadataSize); metadataBuf.flip(); metaData = WALMetaData.deserialize(metadataBuf); // versions before V1.3, should recover memTable ids from entries @@ -157,8 +158,8 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr for (int size : metaData.buffersSize) { channel.position(offset); ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); - channel.read(buffer); - buffer.clear(); + IOUtils.readFully(channel, buffer); + buffer.flip(); metaData.memTablesId.add(buffer.getLong()); offset += size; } @@ -175,7 +176,8 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr private static boolean isValidMagicString(FileChannel channel) throws IOException { ByteBuffer magicStringBytes = ByteBuffer.allocate(WALFileVersion.V2.getVersionBytes().length); - channel.read(magicStringBytes, channel.size() - WALFileVersion.V2.getVersionBytes().length); + IOUtils.readFully( + channel, magicStringBytes, channel.size() - WALFileVersion.V2.getVersionBytes().length); magicStringBytes.flip(); String magicString = new String(magicStringBytes.array(), StandardCharsets.UTF_8); return magicString.equals(WALFileVersion.V2.getVersionString()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java index d1b27060c90b6..46598561a5bb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.recover; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter; @@ -65,7 +66,7 @@ private String readTailMagic(WALFileVersion version) throws IOException { } try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) { ByteBuffer magicStringBytes = ByteBuffer.allocate(size); - channel.read(magicStringBytes, channel.size() - size); + IOUtils.readFully(channel, magicStringBytes, channel.size() - size); magicStringBytes.flip(); return new String(magicStringBytes.array(), StandardCharsets.UTF_8); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java index 8567d30660def..1395eaa420dcc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java @@ -44,6 +44,7 @@ import javax.annotation.Nonnull; import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; @@ -304,9 +305,7 @@ private InputStream createTsFileDataInputStream() { protected void deserializeTsFileDataByte(final InputStream stream) throws IOException { final int size = ReadWriteIOUtils.readInt(stream); this.chunkData = new byte[size]; - if (size != stream.read(chunkData)) { - throw new IOException("TsFileData byte array read error, size mismatch."); - } + new DataInputStream(stream).readFully(chunkData); } private void deserializeEntireChunk(final InputStream stream, final TsFileIOWriter writer) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java index 5cc7f40cd79af..3d94e1a493364 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java @@ -189,20 +189,13 @@ private synchronized Optional generateNextTsFile PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(bufferSize); final byte[] readBuffer = new byte[(int) bufferSize]; - final int readLength = reader.read(readBuffer); - if (readLength != bufferSize) { - memoryBlock.close(); - throw new SubscriptionException( - String.format( - "inconsistent read length (broken invariant), expected: %s, actual: %s", - bufferSize, readLength)); - } + reader.readFully(readBuffer); // generate subscription poll response with piece payload final CachedSubscriptionPollResponse response = new CachedSubscriptionPollResponse( SubscriptionPollResponseType.FILE_PIECE.getType(), - new FilePiecePayload(tsFile.getName(), writingOffset + readLength, readBuffer), + new FilePiecePayload(tsFile.getName(), writingOffset + bufferSize, readBuffer), commitContext); // set fixed memory block for response diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java index 98ca07dcd0381..e622194265acc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.utils.sort; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.db.utils.datastructure.MergeSortKey; import org.apache.iotdb.rpc.TSStatusCode; @@ -93,10 +94,11 @@ private long read() throws IoTDBException { if (readLen == -1) { return -1; } + IOUtils.readFully(fileChannel, bytes); bytes.flip(); int capacity = bytes.getInt(); ByteBuffer tsBlockBytes = ByteBuffer.allocate(capacity); - fileChannel.read(tsBlockBytes); + IOUtils.readFully(fileChannel, tsBlockBytes); tsBlockBytes.flip(); TsBlock cachedTsBlock = serde.deserialize(tsBlockBytes); cacheBlocks.add(cachedTsBlock); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java new file mode 100644 index 0000000000000..b9f583119778e --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.schemaengine.schemaregion.logfile; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class FakeCRC32DeserializerTest { + + @Test + public void deserializeReadsCompletePayloadAfterShortRead() throws IOException { + byte[] payload = new byte[] {1, 2, 3, 4}; + + byte[] deserialized = + new FakeCRC32Deserializer<>(new ByteBufferDeserializer()) + .deserialize(new OneByteAtATimeInputStream(serialize(payload, true))); + + Assert.assertArrayEquals(payload, deserialized); + } + + @Test + public void deserializeThrowsWhenPayloadIsTruncated() throws IOException { + byte[] bytes = serialize(new byte[] {1, 2}, false, false); + + Assert.assertThrows( + EOFException.class, + () -> + new FakeCRC32Deserializer<>(new ByteBufferDeserializer()) + .deserialize(new OneByteAtATimeInputStream(bytes))); + } + + private static byte[] serialize(byte[] payload, boolean complete) throws IOException { + return serialize(payload, complete, true); + } + + private static byte[] serialize(byte[] payload, boolean complete, boolean writeValidationCode) + throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { + dataOutputStream.writeInt(complete ? payload.length : payload.length + 1); + dataOutputStream.write(payload); + if (writeValidationCode) { + dataOutputStream.writeLong(0L); + } + } + return outputStream.toByteArray(); + } + + private static class ByteBufferDeserializer implements IDeserializer { + + @Override + public byte[] deserialize(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + } + + private static class OneByteAtATimeInputStream extends InputStream { + + private final byte[] bytes; + private int index; + + private OneByteAtATimeInputStream(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public int read() { + return index < bytes.length ? bytes[index++] & 0xFF : -1; + } + + @Override + public int read(byte[] b, int off, int len) { + if (len == 0) { + return 0; + } + if (index >= bytes.length) { + return -1; + } + b[off] = bytes[index++]; + return 1; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java new file mode 100644 index 0000000000000..9f3688d76fa92 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.schemaengine.schemaregion.tag; + +import org.apache.commons.io.FileUtils; +import org.apache.tsfile.utils.Pair; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.Collections; +import java.util.Map; + +public class TagLogFileTest { + + private File tempDir; + + @After + public void tearDown() throws Exception { + if (tempDir != null) { + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + public void writeAppendsFirstRecordWithoutReadingPastFileEnd() throws Exception { + tempDir = Files.createTempDirectory("tag-log-file").toFile(); + Map tags = Collections.singletonMap("tag", "value"); + Map attributes = Collections.singletonMap("attr", "value"); + + try (TagLogFile tagLogFile = new TagLogFile(tempDir.getAbsolutePath(), "tag.log")) { + long offset = tagLogFile.write(tags, attributes); + Pair, Map> result = tagLogFile.read(offset); + + Assert.assertEquals(tags, result.left); + Assert.assertEquals(attributes, result.right); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java index bb1b2c1ce5e18..02139d2058d83 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java @@ -29,6 +29,7 @@ import javax.annotation.Nonnull; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -279,13 +280,7 @@ public static TimeWindowStateProgressIndex deserializeFrom(InputStream stream) continue; } final byte[] body = new byte[length]; - final int readLen = stream.read(body); - if (readLen != length) { - throw new IOException( - String.format( - "The intended read length is %s but %s is actually read when deserializing TimeProgressIndex, ProgressIndex: %s", - length, readLen, timeWindowStateProgressIndex)); - } + new DataInputStream(stream).readFully(body); final ByteBuffer dstBuffer = ByteBuffer.wrap(body); timeWindowStateProgressIndex.timeSeries2TimestampWindowBufferPairMap.put( timeSeries, new Pair<>(timestamp, dstBuffer)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java index fc739b58ea641..011a99e8a1dea 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.trigger.exception.TriggerJarTooLargeException; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.commons.io.FileUtils; import org.apache.tsfile.fileSystem.FSFactoryProducer; @@ -214,7 +215,7 @@ public static ByteBuffer transferToBytebuffer(String filePath) throws IOExceptio String.format("Size of file exceed %d bytes", Integer.MAX_VALUE)); } ByteBuffer byteBuffer = ByteBuffer.allocate((int) size); - fileChannel.read(byteBuffer); + IOUtils.readFully(fileChannel, byteBuffer); byteBuffer.flip(); return byteBuffer; } catch (Exception e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java index e2279ce06f4e3..7798acf28e08a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.datastructure.queue.serializer; import org.apache.iotdb.commons.pipe.datastructure.queue.ConcurrentIterableLinkedQueue; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -68,7 +69,7 @@ public void loadQueueFromFile( } int capacity = ReadWriteIOUtils.readInt(inputStream); ByteBuffer buffer = ByteBuffer.allocate(capacity); - channel.read(buffer); + IOUtils.readFully(channel, buffer); buffer.flip(); E element = elementDeserializationFunction.apply(buffer); if (element == null) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java index 498baeda5822f..f2c54cd2c9471 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedOutputStream; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -332,8 +333,8 @@ protected boolean sendBytes(final AirGapSocket socket, byte[] bytes) throws IOEx outputStream.flush(); final byte[] response = new byte[1]; - final int size = socket.getInputStream().read(response); - return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response); + new DataInputStream(socket.getInputStream()).readFully(response); + return Arrays.equals(AirGapOneByteResponse.OK, response); } protected boolean send(final AirGapSocket socket, final byte[] bytes) throws IOException { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java index f96ae8443b985..047dd6bfea5bb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java @@ -28,10 +28,12 @@ import org.apache.tsfile.utils.Pair; import java.io.DataInputStream; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -94,6 +96,26 @@ public static void writeInt( outputStream.write(encodingBuffer.array(), 0, Integer.BYTES); } + public static void readFully(FileChannel fileChannel, ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + if (fileChannel.read(buffer) <= 0) { + throw new EOFException(); + } + } + } + + public static void readFully(FileChannel fileChannel, ByteBuffer buffer, long position) + throws IOException { + long currentPosition = position; + while (buffer.hasRemaining()) { + final int readBytes = fileChannel.read(buffer, currentPosition); + if (readBytes <= 0) { + throw new EOFException(); + } + currentPosition += readBytes; + } + } + /** * Read a string from the given stream. * @@ -120,7 +142,7 @@ public static String readString( strBuffer = new byte[length]; } - inputStream.read(strBuffer, 0, length); + inputStream.readFully(strBuffer, 0, length); return new String(strBuffer, 0, length, encoding); } return null; diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java new file mode 100644 index 0000000000000..bf2cf546628a1 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.commons.utils; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicInteger; + +public class IOUtilsTest { + + private static final String ENCODING = "UTF-8"; + + @Test + public void readStringReadsCompletePayloadAfterShortRead() throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + IOUtils.writeString(outputStream, "abcdefg", ENCODING, null); + + try (DataInputStream inputStream = + new DataInputStream(new OneByteAtATimeInputStream(outputStream.toByteArray()))) { + Assert.assertEquals("abcdefg", IOUtils.readString(inputStream, ENCODING, null)); + } + } + + @Test + public void readStringThrowsWhenPayloadIsTruncated() throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + IOUtils.writeInt(outputStream, 7, null); + outputStream.write(new byte[] {'a', 'b', 'c'}); + + try (DataInputStream inputStream = + new DataInputStream(new OneByteAtATimeInputStream(outputStream.toByteArray()))) { + Assert.assertThrows( + EOFException.class, () -> IOUtils.readString(inputStream, ENCODING, null)); + } + } + + @Test + public void readFullyReadsCompleteByteBufferAfterShortChannelRead() throws IOException { + byte[] bytes = new byte[] {1, 2, 3}; + FileChannel channel = mockOneByteAtATimeChannel(bytes); + ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + IOUtils.readFully(channel, buffer); + + Assert.assertArrayEquals(bytes, buffer.array()); + } + + @Test + public void readFullyReadsCompleteByteBufferFromPositionAfterShortChannelRead() + throws IOException { + byte[] bytes = new byte[] {1, 2, 3, 4, 5}; + FileChannel channel = mockOneByteAtATimeChannel(bytes); + ByteBuffer buffer = ByteBuffer.allocate(3); + + IOUtils.readFully(channel, buffer, 2); + + Assert.assertArrayEquals(new byte[] {3, 4, 5}, buffer.array()); + } + + @Test + public void readFullyThrowsWhenChannelIsTruncated() throws IOException { + FileChannel channel = mockOneByteAtATimeChannel(new byte[] {1, 2}); + ByteBuffer buffer = ByteBuffer.allocate(3); + + Assert.assertThrows(EOFException.class, () -> IOUtils.readFully(channel, buffer)); + } + + private static FileChannel mockOneByteAtATimeChannel(byte[] bytes) throws IOException { + FileChannel channel = Mockito.mock(FileChannel.class); + AtomicInteger index = new AtomicInteger(); + Mockito.when(channel.read(Mockito.any(ByteBuffer.class))) + .thenAnswer( + invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + int currentIndex = index.getAndIncrement(); + if (currentIndex >= bytes.length) { + return -1; + } + buffer.put(bytes[currentIndex]); + return 1; + }); + Mockito.when(channel.read(Mockito.any(ByteBuffer.class), Mockito.anyLong())) + .thenAnswer( + invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + long position = invocation.getArgument(1); + if (position >= bytes.length) { + return -1; + } + buffer.put(bytes[(int) position]); + return 1; + }); + return channel; + } + + private static class OneByteAtATimeInputStream extends InputStream { + + private final byte[] bytes; + private int index; + + private OneByteAtATimeInputStream(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public int read() { + return index < bytes.length ? bytes[index++] & 0xFF : -1; + } + + @Override + public int read(byte[] b, int off, int len) { + if (len == 0) { + return 0; + } + if (index >= bytes.length) { + return -1; + } + b[off] = bytes[index++]; + return 1; + } + } +}