diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/QueryDataEncodingConfig.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/QueryDataEncodingConfig.java index 2b3f4afd1adb..844864e6e31c 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/QueryDataEncodingConfig.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/QueryDataEncodingConfig.java @@ -15,12 +15,18 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.units.DataSize; +import io.airlift.units.MaxDataSize; +import io.airlift.units.MinDataSize; + +import static io.airlift.units.DataSize.Unit.KILOBYTE; public class QueryDataEncodingConfig { private boolean jsonEnabled = true; private boolean jsonZstdEnabled = true; private boolean jsonLz4Enabled = true; + private DataSize compressionThreshold = DataSize.of(8, KILOBYTE); public boolean isJsonEnabled() { @@ -60,4 +66,19 @@ public QueryDataEncodingConfig setJsonLz4Enabled(boolean jsonLz4Enabled) this.jsonLz4Enabled = jsonLz4Enabled; return this; } + + @MinDataSize("1kB") + @MaxDataSize("4MB") + public DataSize getCompressionThreshold() + { + return compressionThreshold; + } + + @Config("protocol.spooling.encoding.compression.threshold") + @ConfigDescription("Do not compress segments smaller than threshold") + public QueryDataEncodingConfig setCompressionThreshold(DataSize compressionThreshold) + { + this.compressionThreshold = compressionThreshold; + return this; + } } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/JsonQueryDataEncoder.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/JsonQueryDataEncoder.java index 81b58988f917..c29432421c7f 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/JsonQueryDataEncoder.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/JsonQueryDataEncoder.java @@ -22,6 +22,7 @@ import io.trino.server.protocol.JsonEncodingUtils.TypeEncoder; import io.trino.server.protocol.OutputColumn; import io.trino.server.protocol.spooling.QueryDataEncoder; +import io.trino.server.protocol.spooling.QueryDataEncodingConfig; import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -106,10 +107,18 @@ public String encoding() public static class ZstdFactory extends Factory { + private final int compressionThreshold; + + @Inject + public ZstdFactory(QueryDataEncodingConfig config) + { + this.compressionThreshold = toIntExact(config.getCompressionThreshold().toBytes()); + } + @Override public QueryDataEncoder create(Session session, List columns) { - return new ZstdQueryDataEncoder(super.create(session, columns)); + return new ZstdQueryDataEncoder(super.create(session, columns), compressionThreshold); } @Override @@ -122,10 +131,18 @@ public String encoding() public static class Lz4Factory extends Factory { + private final int compressionThreshold; + + @Inject + public Lz4Factory(QueryDataEncodingConfig config) + { + this.compressionThreshold = toIntExact(config.getCompressionThreshold().toBytes()); + } + @Override public QueryDataEncoder create(Session session, List columns) { - return new Lz4QueryDataEncoder(super.create(session, columns)); + return new Lz4QueryDataEncoder(super.create(session, columns), compressionThreshold); } @Override diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/Lz4QueryDataEncoder.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/Lz4QueryDataEncoder.java index 287b29c84cc9..1b6793be25ac 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/Lz4QueryDataEncoder.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/Lz4QueryDataEncoder.java @@ -22,11 +22,9 @@ public class Lz4QueryDataEncoder extends CompressedQueryDataEncoder { - private static final int COMPRESSION_THRESHOLD = 8192; - - public Lz4QueryDataEncoder(QueryDataEncoder delegate) + public Lz4QueryDataEncoder(QueryDataEncoder delegate, int compressionThreshold) { - super(delegate, COMPRESSION_THRESHOLD); + super(delegate, compressionThreshold); } @Override diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/ZstdQueryDataEncoder.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/ZstdQueryDataEncoder.java index d4f91cc9e2c5..2d778b803857 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/ZstdQueryDataEncoder.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/ZstdQueryDataEncoder.java @@ -22,11 +22,9 @@ public class ZstdQueryDataEncoder extends CompressedQueryDataEncoder { - private static final int COMPRESSION_THRESHOLD = 8192; - - public ZstdQueryDataEncoder(QueryDataEncoder delegate) + public ZstdQueryDataEncoder(QueryDataEncoder delegate, int compressionThreshold) { - super(delegate, COMPRESSION_THRESHOLD); + super(delegate, compressionThreshold); } @Override diff --git a/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestQueryDataEncodingConfig.java b/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestQueryDataEncodingConfig.java index 78844e285e03..ad7eda5359c9 100644 --- a/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestQueryDataEncodingConfig.java +++ b/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestQueryDataEncodingConfig.java @@ -14,6 +14,7 @@ package io.trino.server.protocol.spooling; import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; import org.junit.jupiter.api.Test; import java.util.Map; @@ -21,6 +22,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.units.DataSize.Unit.KILOBYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; class TestQueryDataEncodingConfig { @@ -30,7 +33,8 @@ public void testDefaults() assertRecordedDefaults(recordDefaults(QueryDataEncodingConfig.class) .setJsonEnabled(true) .setJsonLz4Enabled(true) - .setJsonZstdEnabled(true)); + .setJsonZstdEnabled(true) + .setCompressionThreshold(DataSize.of(8, KILOBYTE))); } @Test @@ -40,12 +44,14 @@ public void testExplicitPropertyMappings() .put("protocol.spooling.encoding.json.enabled", "false") .put("protocol.spooling.encoding.json+lz4.enabled", "false") .put("protocol.spooling.encoding.json+zstd.enabled", "false") + .put("protocol.spooling.encoding.compression.threshold", "1MB") .buildOrThrow(); QueryDataEncodingConfig expected = new QueryDataEncodingConfig() .setJsonEnabled(false) .setJsonLz4Enabled(false) - .setJsonZstdEnabled(false); + .setJsonZstdEnabled(false) + .setCompressionThreshold(DataSize.of(1, MEGABYTE)); assertFullMapping(properties, expected); }