diff --git a/src/kafunk/Protocol.fs b/src/kafunk/Protocol.fs index 8f5fe29..56c7213 100644 --- a/src/kafunk/Protocol.fs +++ b/src/kafunk/Protocol.fs @@ -704,8 +704,7 @@ module Protocol = MessageSet.WriteRecords (ms,BinaryZipper(value)) |> ignore value let compressedValue = CompressionCodec.compress compression value - if compressedValue.Count > value.Count then - //failwithf "compressed_value_larged|compressed=%i uncompressed=%i" compressedValue.Count value.Count + if compressedValue.Count > value.Count then compression <- CompressionCodec.None MessageSet.WriteRecords (ms,buf) else @@ -742,7 +741,7 @@ module Protocol = let offsetDelta = buf.ReadVarint() let key = buf.ReadVarintBytes() let value = buf.ReadVarintBytes() - let headersLength = buf.ReadVarint() + let headersLength = buf.ReadVarint() if headersLength < 0 then failwithf "invalid_headers_length=%i" headersLength else let headers = Array.zeroCreate headersLength for i = 0 to headers.Length - 1 do @@ -800,10 +799,13 @@ module Protocol = MessageSet.ReadRecords (buf,magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) | compression -> let recordsLength = sizeInBytes - RecordBatch.RECORD_BATCH_OVERHEAD - let compressedValue = buf.Slice recordsLength - let decompressedValue = CompressionCodec.decompress compression compressedValue - MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) - buf.ShiftOffset recordsLength + if buf.Buffer.Count < recordsLength then + buf.ShiftOffset buf.Buffer.Count + else + let compressedValue = buf.Slice recordsLength + let decompressedValue = CompressionCodec.decompress compression compressedValue + MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) + buf.ShiftOffset recordsLength if checkCrc then let crcCount = buf.Buffer.Count - attributesOffset let crc = Crc.crc32C buf.Buffer.Array attributesOffset crcCount