Skip to content

RATIS-2571. DataStreamInput#readAsync returns ByteBuffers#1492

Merged
szetszwo merged 5 commits into
apache:masterfrom
amaliujia:change_API
Jun 26, 2026
Merged

RATIS-2571. DataStreamInput#readAsync returns ByteBuffers#1492
szetszwo merged 5 commits into
apache:masterfrom
amaliujia:change_API

Conversation

@amaliujia

@amaliujia amaliujia commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

In https://github.com/apache/ratis/pull/1488/changes#diff-1164c13fc880c0b59a0752eaa11d0e063f366d107798ab9abfd75d0688631910R55-R61, In order to access the buffer, we realize that the existing DataStreamInput#readAsync would require the end user to know the implementation details.

    if (reply instanceof DataStreamReplyByteBuf) {
      return ((DataStreamReplyByteBuf) reply).slice().nioBuffer();
    } else if (reply instanceof DataStreamReplyByteBuffer) {
      return ((DataStreamReplyByteBuffer) reply).slice();
    }

This PR proposes that DataStreamInput#readAsync returns the Buffers to make it easy to use.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-2571

How was this patch tested?

Existing unit tests

@amaliujia

amaliujia commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

@peterxcli @szetszwo

@amaliujia amaliujia changed the title RATIS-2571. DataStreamInput#readAsync returns Buffers RATIS-2571. DataStreamInput#readAsync returns ByteBuffers Jun 25, 2026

@peterxcli peterxcli left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

Comment on lines +23 to +26
/* A chunk returned from the streaming read API. */
public interface DataStreamReadChunk {
ByteBuffer[] nioBuffers();
} No newline at end of file

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaliujia , thanks for working on this!

It is a good idea to have methods to get ByteBuffer. Let's add it to DataStreamPacket instead of having a new interface:

diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuf.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuf.java
index e6ceeb93d..a6ac94130 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuf.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuf.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -56,6 +57,16 @@ public class DataStreamPacketByteBuf extends DataStreamPacketImpl {
     return getBuf().slice();
   }
 
+  @Override
+  public ByteBuffer nioBuffer() {
+    return getBuf().nioBuffer();
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers() {
+    return getBuf().nioBuffers();
+  }
+
   public final void release() {
     final ByteBuf got = buf.getAndSet(null);
     if (got != null) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index b8d7b48a7..a7d6f49d9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -44,4 +44,14 @@ public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
   public ByteBuffer slice() {
     return buffer.slice();
   }
+
+  @Override
+  public ByteBuffer nioBuffer() {
+    return slice();
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers() {
+    return new ByteBuffer[]{slice()};
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index caebb9e9b..95c5e6211 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -20,6 +20,8 @@ package org.apache.ratis.protocol;
 
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
+import java.nio.ByteBuffer;
+
 public interface DataStreamPacket {
   ClientId getClientId();
 
@@ -30,4 +32,12 @@ public interface DataStreamPacket {
   long getStreamOffset();
 
   long getDataLength();
+
+  default ByteBuffer nioBuffer() {
+    throw new UnsupportedOperationException();
+  }
+
+  default ByteBuffer[] nioBuffers() {
+    throw new UnsupportedOperationException();
+  }
 }
\ No newline at end of file

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Indeed this propose is much easier and cleaner

@szetszwo szetszwo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

@szetszwo szetszwo merged commit 898a39c into apache:master Jun 26, 2026
18 checks passed
@szetszwo

Copy link
Copy Markdown
Contributor

@amaliujia , thanks for working on this!

@peterxcli , thanks for reviewing this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants