Skip to content

Commit

Permalink
Make compression threshold configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Dec 2, 2024
1 parent 53e33ee commit 29c48ff
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;

import static io.airlift.units.DataSize.Unit.KILOBYTE;

public class QueryDataEncodingConfig
{
private boolean jsonEnabled = true;
private boolean jsonZstdEnabled = true;
private boolean jsonLz4Enabled = true;
private DataSize compressionThreshold = DataSize.of(8, KILOBYTE);

public boolean isJsonEnabled()
{
Expand Down Expand Up @@ -60,4 +66,19 @@ public QueryDataEncodingConfig setJsonLz4Enabled(boolean jsonLz4Enabled)
this.jsonLz4Enabled = jsonLz4Enabled;
return this;
}

@MinDataSize("1kB")
@MaxDataSize("4MB")
public DataSize getCompressionThreshold()
{
return compressionThreshold;
}

@Config("protocol.spooling.encoding.compression.threshold")
@ConfigDescription("Do not compress segments smaller than threshold")
public QueryDataEncodingConfig setCompressionThreshold(DataSize compressionThreshold)
{
this.compressionThreshold = compressionThreshold;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.server.protocol.JsonEncodingUtils.TypeEncoder;
import io.trino.server.protocol.OutputColumn;
import io.trino.server.protocol.spooling.QueryDataEncoder;
import io.trino.server.protocol.spooling.QueryDataEncodingConfig;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -106,10 +107,18 @@ public String encoding()
public static class ZstdFactory
extends Factory
{
private final int compressionThreshold;

@Inject
public ZstdFactory(QueryDataEncodingConfig config)
{
this.compressionThreshold = toIntExact(config.getCompressionThreshold().toBytes());
}

@Override
public QueryDataEncoder create(Session session, List<OutputColumn> columns)
{
return new ZstdQueryDataEncoder(super.create(session, columns));
return new ZstdQueryDataEncoder(super.create(session, columns), compressionThreshold);
}

@Override
Expand All @@ -122,10 +131,18 @@ public String encoding()
public static class Lz4Factory
extends Factory
{
private final int compressionThreshold;

@Inject
public Lz4Factory(QueryDataEncodingConfig config)
{
this.compressionThreshold = toIntExact(config.getCompressionThreshold().toBytes());
}

@Override
public QueryDataEncoder create(Session session, List<OutputColumn> columns)
{
return new Lz4QueryDataEncoder(super.create(session, columns));
return new Lz4QueryDataEncoder(super.create(session, columns), compressionThreshold);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
public class Lz4QueryDataEncoder
extends CompressedQueryDataEncoder
{
private static final int COMPRESSION_THRESHOLD = 8192;

public Lz4QueryDataEncoder(QueryDataEncoder delegate)
public Lz4QueryDataEncoder(QueryDataEncoder delegate, int compressionThreshold)
{
super(delegate, COMPRESSION_THRESHOLD);
super(delegate, compressionThreshold);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
public class ZstdQueryDataEncoder
extends CompressedQueryDataEncoder
{
private static final int COMPRESSION_THRESHOLD = 8192;

public ZstdQueryDataEncoder(QueryDataEncoder delegate)
public ZstdQueryDataEncoder(QueryDataEncoder delegate, int compressionThreshold)
{
super(delegate, COMPRESSION_THRESHOLD);
super(delegate, compressionThreshold);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
package io.trino.server.protocol.spooling;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

class TestQueryDataEncodingConfig
{
Expand All @@ -30,7 +33,8 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(QueryDataEncodingConfig.class)
.setJsonEnabled(true)
.setJsonLz4Enabled(true)
.setJsonZstdEnabled(true));
.setJsonZstdEnabled(true)
.setCompressionThreshold(DataSize.of(8, KILOBYTE)));
}

@Test
Expand All @@ -40,12 +44,14 @@ public void testExplicitPropertyMappings()
.put("protocol.spooling.encoding.json.enabled", "false")
.put("protocol.spooling.encoding.json+lz4.enabled", "false")
.put("protocol.spooling.encoding.json+zstd.enabled", "false")
.put("protocol.spooling.encoding.compression.threshold", "1MB")
.buildOrThrow();

QueryDataEncodingConfig expected = new QueryDataEncodingConfig()
.setJsonEnabled(false)
.setJsonLz4Enabled(false)
.setJsonZstdEnabled(false);
.setJsonZstdEnabled(false)
.setCompressionThreshold(DataSize.of(1, MEGABYTE));

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit 29c48ff

Please sign in to comment.