Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
Expand Down Expand Up @@ -163,8 +164,16 @@ private static Optional<Procedure> loadProcedure(Path procedureFilePath) {
try (FileInputStream fis = new FileInputStream(procedureFilePath.toFile())) {
Procedure procedure = null;
try (FileChannel channel = fis.getChannel()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
if (channel.read(byteBuffer) > 0) {
final long fileSize = channel.size();
if (fileSize > PROCEDURE_LOAD_BUFFER_SIZE) {
throw new IOException(
String.format(
"Procedure file %s exceeds the load buffer limit %s, actual size %s",
procedureFilePath, PROCEDURE_LOAD_BUFFER_SIZE, fileSize));
}
ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileSize);
if (fileSize > 0) {
IOUtils.readFully(channel, byteBuffer);
byteBuffer.flip();
procedure = ProcedureFactory.getInstance().create(byteBuffer);
byteBuffer.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ public boolean hasNext() {
}
buffer = new byte[logSize];

int readLen = logStream.read(buffer, 0, logSize);
if (readLen < logSize) {
throw new IOException("Reach eof");
}
logStream.readFully(buffer, 0, logSize);

final long checkSum = logStream.readLong();
checkSummer.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.transformation.datastructure;

import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.TemporaryQueryDataFileService;

Expand Down Expand Up @@ -60,7 +61,7 @@ default void deserialize() throws IOException {
}
init();
ByteBuffer byteBuffer = ByteBuffer.allocate(recorder.getSerializedByteLength());
recorder.getFileChannel().read(byteBuffer);
IOUtils.readFully(recorder.getFileChannel(), byteBuffer);
byteBuffer.flip();
deserialize(byteBuffer);
recorder.closeFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -67,9 +66,7 @@ public T deserialize(InputStream inputStream) throws IOException {
}

byte[] logBuffer = new byte[logLength];
if (logLength < inputStream.read(logBuffer, 0, logLength)) {
throw new EOFException();
}
dataInputStream.readFully(logBuffer, 0, logLength);

T result = deserializer.deserialize(ByteBuffer.wrap(logBuffer));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
Expand Down Expand Up @@ -360,7 +361,7 @@ private void initFileHeader() throws IOException, MetadataException {
lastSGAddr = 0L;
pageManager = new BTreePageManager(channel, pmtFile, -1, logPath);
} else {
channel.read(headerContent);
IOUtils.readFully(channel, headerContent);
headerContent.clear();
lastPageIndex = ReadWriteIOUtils.readInt(headerContent);
dataTTL = ReadWriteIOUtils.readLong(headerContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -91,8 +93,9 @@ public List<byte[]> collectUpdatedEntries() throws IOException, SchemaFileLogCor
}
}

// corrupted within one entry
if (inputStream.read(tempBytes, 1, tempBytes.length - 1) < tempBytes.length - 2) {
try {
new DataInputStream(inputStream).readFully(tempBytes, 1, tempBytes.length - 1);
} catch (EOFException e) {
throw new SchemaFileLogCorruptedException(logFile.getAbsolutePath(), "incomplete entry.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;

import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void loadFromFileToBuffer(ByteBuffer dst, int pageIndex) throws IOExcepti
if (!readChannel.isOpen()) {
readChannel = FileChannel.open(pmtFile.toPath(), StandardOpenOption.READ);
}
readChannel.read(dst, getPageAddress(pageIndex));
IOUtils.readFully(readChannel, dst, getPageAddress(pageIndex));
}

// region Flush Strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;

import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -114,7 +115,7 @@ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position)
throws IOException {
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
fileChannel.read(byteBuffer, position);
IOUtils.readFully(fileChannel, byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
Expand All @@ -129,7 +130,7 @@ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position)
// read one offset, then use filechannel's read to read it
byteBuffers.position(MAX_LENGTH * i);
byteBuffers.limit(MAX_LENGTH * (i + 1));
fileChannel.read(byteBuffers, nextPosition);
IOUtils.readFully(fileChannel, byteBuffers, nextPosition);
byteBuffers.position(4 + i * Long.BYTES);
}
byteBuffers.limit(byteBuffers.capacity());
Expand All @@ -144,7 +145,10 @@ private List<Long> parseOffsetList(long position) throws IOException {
blockOffset.add(position);
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
fileChannel.read(byteBuffer, position);
if (position == fileChannel.size()) {
return blockOffset;
}
IOUtils.readFully(fileChannel, byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
Expand All @@ -167,7 +171,7 @@ private List<Long> parseOffsetList(long position) throws IOException {
// read
blockBuffer.position(MAX_LENGTH * i);
blockBuffer.limit(MAX_LENGTH * (i + 1));
fileChannel.read(blockBuffer, blockOffset.get(i));
IOUtils.readFully(fileChannel, blockBuffer, blockOffset.get(i));
blockBuffer.position(4 + i * Long.BYTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.io;

import org.apache.iotdb.commons.utils.IOUtils;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -63,7 +65,7 @@ public static WALFileVersion getVersion(FileChannel channel) throws IOException
continue;
}
ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length);
channel.read(buffer);
IOUtils.readFully(channel, buffer);
buffer.flip();
String versionString = new String(buffer.array(), StandardCharsets.UTF_8);
if (version.versionString.equals(versionString)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.utils.MmapUtil;

Expand Down Expand Up @@ -85,7 +86,8 @@ private void getEndOffset() throws IOException {
if (version == WALFileVersion.V2) {
// New Version
ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length);
channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length);
IOUtils.readFully(
channel, magicStringBuffer, channel.size() - version.getVersionBytes().length);
magicStringBuffer.flip();
if (logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX)
|| !new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
Expand All @@ -105,7 +107,8 @@ private void getEndOffset() throws IOException {
}
// Old version
ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length);
channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length);
IOUtils.readFully(
channel, magicStringBuffer, channel.size() - version.getVersionBytes().length);
magicStringBuffer.flip();
if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
.equals(version.getVersionString())) {
Expand All @@ -117,7 +120,7 @@ private void getEndOffset() throws IOException {
}
}
// Read the metadata size
channel.read(metadataSizeBuf, position);
IOUtils.readFully(channel, metadataSizeBuf, position);
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
endOffset = channel.size() - version.getVersionBytes().length - Integer.BYTES - metadataSize;
Expand Down Expand Up @@ -237,9 +240,7 @@ private void loadNextSegmentV2() throws IOException {
compressedBuffer.clear();
// limit the buffer to prevent it from reading too much byte than expected
compressedBuffer.limit(segmentInfo.dataInDiskSize);
if (readWALBufferFromChannel(compressedBuffer) != segmentInfo.dataInDiskSize) {
throw new IOException("Unexpected end of file");
}
readWALBufferFullyFromChannel(compressedBuffer);
compressedBuffer.flip();
IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType);
uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
Expand All @@ -255,9 +256,7 @@ private void loadNextSegmentV2() throws IOException {
// limit the buffer to prevent it from reading too much byte than expected
dataBuffer.limit(segmentInfo.dataInDiskSize);

if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) {
throw new IOException("Unexpected end of file");
}
readWALBufferFullyFromChannel(dataBuffer);
}
dataBuffer.flip();
}
Expand Down Expand Up @@ -301,7 +300,7 @@ public void skipToGivenLogicalPosition(long pos) throws IOException {

if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
readWALBufferFromChannel(compressedBuffer);
readWALBufferFullyFromChannel(compressedBuffer);
compressedBuffer.flip();
IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType);
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
Expand All @@ -310,7 +309,7 @@ public void skipToGivenLogicalPosition(long pos) throws IOException {
compressedBuffer = null;
} else {
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
readWALBufferFromChannel(dataBuffer);
readWALBufferFullyFromChannel(dataBuffer);
dataBuffer.flip();
}

Expand Down Expand Up @@ -349,15 +348,15 @@ public WALMetaData getWALMetaData() throws IOException {

private SegmentInfo getNextSegmentInfo() throws IOException {
segmentHeaderWithoutCompressedSizeBuffer.clear();
channel.read(segmentHeaderWithoutCompressedSizeBuffer);
readWALBufferFullyFromChannel(segmentHeaderWithoutCompressedSizeBuffer);
segmentHeaderWithoutCompressedSizeBuffer.flip();
SegmentInfo info = new SegmentInfo();
info.compressionType =
CompressionType.deserialize(segmentHeaderWithoutCompressedSizeBuffer.get());
info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt();
if (info.compressionType != CompressionType.UNCOMPRESSED) {
compressedSizeBuffer.clear();
readWALBufferFromChannel(compressedSizeBuffer);
readWALBufferFullyFromChannel(compressedSizeBuffer);
compressedSizeBuffer.flip();
info.uncompressedSize = compressedSizeBuffer.getInt();
} else {
Expand All @@ -373,6 +372,13 @@ private int readWALBufferFromChannel(ByteBuffer buffer) throws IOException {
return size;
}

private void readWALBufferFullyFromChannel(ByteBuffer buffer) throws IOException {
long startTime = System.nanoTime();
int size = buffer.remaining();
IOUtils.readFully(channel, buffer);
WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() - startTime);
}

private void uncompressWALBuffer(
ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor unCompressor)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.storageengine.dataregion.wal.io;

import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException;
import org.apache.iotdb.db.utils.SerializedSize;
Expand Down Expand Up @@ -143,12 +144,12 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr
ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
WALFileVersion version = WALFileVersion.getVersion(channel);
position = channel.size() - Integer.BYTES - (version.getVersionBytes().length);
channel.read(metadataSizeBuf, position);
IOUtils.readFully(channel, metadataSizeBuf, position);
metadataSizeBuf.flip();
// load metadata
int metadataSize = metadataSizeBuf.getInt();
ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
channel.read(metadataBuf, position - metadataSize);
IOUtils.readFully(channel, metadataBuf, position - metadataSize);
metadataBuf.flip();
metaData = WALMetaData.deserialize(metadataBuf);
// versions before V1.3, should recover memTable ids from entries
Expand All @@ -157,8 +158,8 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr
for (int size : metaData.buffersSize) {
channel.position(offset);
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
channel.read(buffer);
buffer.clear();
IOUtils.readFully(channel, buffer);
buffer.flip();
metaData.memTablesId.add(buffer.getLong());
offset += size;
}
Expand All @@ -175,7 +176,8 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr

private static boolean isValidMagicString(FileChannel channel) throws IOException {
ByteBuffer magicStringBytes = ByteBuffer.allocate(WALFileVersion.V2.getVersionBytes().length);
channel.read(magicStringBytes, channel.size() - WALFileVersion.V2.getVersionBytes().length);
IOUtils.readFully(
channel, magicStringBytes, channel.size() - WALFileVersion.V2.getVersionBytes().length);
magicStringBytes.flip();
String magicString = new String(magicStringBytes.array(), StandardCharsets.UTF_8);
return magicString.equals(WALFileVersion.V2.getVersionString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.storageengine.dataregion.wal.recover;

import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter;
Expand Down Expand Up @@ -65,7 +66,7 @@ private String readTailMagic(WALFileVersion version) throws IOException {
}
try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) {
ByteBuffer magicStringBytes = ByteBuffer.allocate(size);
channel.read(magicStringBytes, channel.size() - size);
IOUtils.readFully(channel, magicStringBytes, channel.size() - size);
magicStringBytes.flip();
return new String(magicStringBytes.array(), StandardCharsets.UTF_8);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import javax.annotation.Nonnull;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -304,9 +305,7 @@ private InputStream createTsFileDataInputStream() {
protected void deserializeTsFileDataByte(final InputStream stream) throws IOException {
final int size = ReadWriteIOUtils.readInt(stream);
this.chunkData = new byte[size];
if (size != stream.read(chunkData)) {
throw new IOException("TsFileData byte array read error, size mismatch.");
}
new DataInputStream(stream).readFully(chunkData);
}

private void deserializeEntireChunk(final InputStream stream, final TsFileIOWriter writer)
Expand Down
Loading
Loading