From db4017164fe4a0cd15b3ca4005c27612a0686f7e Mon Sep 17 00:00:00 2001 From: Lev Gorodinski Date: Mon, 19 Mar 2018 09:10:06 -0400 Subject: [PATCH] fix partial decompressed record batch response --- src/kafunk/Protocol.fs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/kafunk/Protocol.fs b/src/kafunk/Protocol.fs index 8f5fe29..56c7213 100644 --- a/src/kafunk/Protocol.fs +++ b/src/kafunk/Protocol.fs @@ -704,8 +704,7 @@ module Protocol = MessageSet.WriteRecords (ms,BinaryZipper(value)) |> ignore value let compressedValue = CompressionCodec.compress compression value - if compressedValue.Count > value.Count then - //failwithf "compressed_value_larged|compressed=%i uncompressed=%i" compressedValue.Count value.Count + if compressedValue.Count > value.Count then compression <- CompressionCodec.None MessageSet.WriteRecords (ms,buf) else @@ -742,7 +741,7 @@ module Protocol = let offsetDelta = buf.ReadVarint() let key = buf.ReadVarintBytes() let value = buf.ReadVarintBytes() - let headersLength = buf.ReadVarint() + let headersLength = buf.ReadVarint() if headersLength < 0 then failwithf "invalid_headers_length=%i" headersLength else let headers = Array.zeroCreate headersLength for i = 0 to headers.Length - 1 do @@ -800,10 +799,13 @@ module Protocol = MessageSet.ReadRecords (buf,magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) | compression -> let recordsLength = sizeInBytes - RecordBatch.RECORD_BATCH_OVERHEAD - let compressedValue = buf.Slice recordsLength - let decompressedValue = CompressionCodec.decompress compression compressedValue - MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) - buf.ShiftOffset recordsLength + if buf.Buffer.Count < recordsLength then + buf.ShiftOffset buf.Buffer.Count + else + let compressedValue = buf.Slice recordsLength + let decompressedValue = CompressionCodec.decompress compression compressedValue + MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) + buf.ShiftOffset recordsLength if checkCrc then let crcCount = buf.Buffer.Count - attributesOffset let crc = Crc.crc32C buf.Buffer.Array attributesOffset crcCount