diff --git a/src/main/java/io/openmessaging/storage/dledger/BatchAppendFuture.java b/src/main/java/io/openmessaging/storage/dledger/BatchAppendFuture.java index d88fd83c..0b36edee 100644 --- a/src/main/java/io/openmessaging/storage/dledger/BatchAppendFuture.java +++ b/src/main/java/io/openmessaging/storage/dledger/BatchAppendFuture.java @@ -19,6 +19,10 @@ public class BatchAppendFuture extends AppendFuture { private long[] positions; + public BatchAppendFuture() { + + } + public BatchAppendFuture(long timeOutMs) { super(timeOutMs); } @@ -30,4 +34,11 @@ public long[] getPositions() { public void setPositions(long[] positions) { this.positions = positions; } + + public static BatchAppendFuture newCompletedFuture(long pos, T value) { + BatchAppendFuture future = new BatchAppendFuture(); + future.setPos(pos); + future.complete(value); + return future; + } } \ No newline at end of file diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 47931df3..32d3b9ae 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -149,6 +149,9 @@ public CompletableFuture waitAck(DLedgerEntry entry, boolea response.setIndex(entry.getIndex()); response.setTerm(entry.getTerm()); response.setPos(entry.getPos()); + if (isBatchWait) { + return BatchAppendFuture.newCompletedFuture(entry.getPos(), response); + } return AppendFuture.newCompletedFuture(entry.getPos(), response); } else { checkTermForPendingMap(entry.getTerm(), "waitAck");