Skip to content

Commit

Permalink
Add query level aggregated shuffle bytes and rows in QueryStats
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng authored and mbasmanova committed Jun 30, 2022
1 parent de6ef8b commit 55e24cf
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,8 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getRawInputPositions(),
queryStats.getProcessedInputDataSize(),
queryStats.getProcessedInputPositions(),
queryStats.getShuffledDataSize(),
queryStats.getShuffledPositions(),
queryStats.getOutputDataSize(),
queryStats.getOutputPositions(),
queryStats.getWrittenOutputPositions(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.operator.BlockedReason;
import com.facebook.presto.operator.ExchangeOperator;
import com.facebook.presto.operator.MergeOperator;
import com.facebook.presto.operator.OperatorStats;
import com.facebook.presto.operator.ScanFilterAndProjectOperator;
import com.facebook.presto.operator.TableScanOperator;
import com.facebook.presto.operator.TableWriterOperator;
import com.facebook.presto.spi.eventlistener.StageGcStatistics;
import com.facebook.presto.sql.planner.PlanFragment;
Expand Down Expand Up @@ -100,6 +104,9 @@ public class QueryStats
private final DataSize processedInputDataSize;
private final long processedInputPositions;

private final DataSize shuffledDataSize;
private final long shuffledPositions;

private final DataSize outputDataSize;
private final long outputPositions;

Expand Down Expand Up @@ -172,6 +179,9 @@ public QueryStats(
@JsonProperty("processedInputDataSize") DataSize processedInputDataSize,
@JsonProperty("processedInputPositions") long processedInputPositions,

@JsonProperty("shuffledDataSize") DataSize shuffledDataSize,
@JsonProperty("shuffledPositions") long shuffledPositions,

@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("outputPositions") long outputPositions,

Expand Down Expand Up @@ -252,6 +262,10 @@ public QueryStats(
checkArgument(processedInputPositions >= 0, "processedInputPositions is negative");
this.processedInputPositions = processedInputPositions;

this.shuffledDataSize = requireNonNull(shuffledDataSize, "shuffledDataSize is null");
checkArgument(shuffledPositions >= 0, "shuffledPositions is negative");
this.shuffledPositions = shuffledPositions;

this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
checkArgument(outputPositions >= 0, "outputPositions is negative");
this.outputPositions = outputPositions;
Expand Down Expand Up @@ -309,6 +323,9 @@ public static QueryStats create(
long processedInputDataSize = 0;
long processedInputPositions = 0;

long shuffledDataSize = 0;
long shuffledPositions = 0;

long outputDataSize = 0;
long outputPositions = 0;

Expand Down Expand Up @@ -354,12 +371,19 @@ public static QueryStats create(

if (stageInfo.getPlan().isPresent()) {
PlanFragment plan = stageInfo.getPlan().get();
if (!plan.getTableScanSchedulingOrder().isEmpty()) {
rawInputDataSize += stageExecutionStats.getRawInputDataSize().toBytes();
rawInputPositions += stageExecutionStats.getRawInputPositions();

processedInputDataSize += stageExecutionStats.getProcessedInputDataSize().toBytes();
processedInputPositions += stageExecutionStats.getProcessedInputPositions();
for (OperatorStats operatorStats : stageExecutionStats.getOperatorSummaries()) {
// NOTE: we need to literally check each operator type to tell if the source is from table input or shuffled input. A stage can have input from both types of source.
String operatorType = operatorStats.getOperatorType();
if (operatorType.equals(ExchangeOperator.class.getSimpleName()) || operatorType.equals(MergeOperator.class.getSimpleName())) {
shuffledPositions += operatorStats.getRawInputPositions();
shuffledDataSize += operatorStats.getRawInputDataSize().toBytes();
}
else if (operatorType.equals(TableScanOperator.class.getSimpleName()) || operatorType.equals(ScanFilterAndProjectOperator.class.getSimpleName())) {
rawInputDataSize += operatorStats.getRawInputPositions();
rawInputPositions += operatorStats.getRawInputPositions();
processedInputDataSize += stageExecutionStats.getProcessedInputDataSize().toBytes();
processedInputPositions += stageExecutionStats.getProcessedInputPositions();
}
}

if (plan.isOutputTableWriterFragment()) {
Expand Down Expand Up @@ -454,6 +478,8 @@ public static QueryStats create(
rawInputPositions,
succinctBytes(processedInputDataSize),
processedInputPositions,
succinctBytes(shuffledDataSize),
shuffledPositions,
succinctBytes(outputDataSize),
outputPositions,

Expand Down Expand Up @@ -529,6 +555,8 @@ public static QueryStats immediateFailureQueryStats()
0,
new DataSize(0, BYTE),
0,
new DataSize(0, BYTE),
0,
0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
Expand Down Expand Up @@ -809,6 +837,18 @@ public long getProcessedInputPositions()
return processedInputPositions;
}

@JsonProperty
public DataSize getShuffledDataSize()
{
return shuffledDataSize;
}

@JsonProperty
public long getShuffledPositions()
{
return shuffledPositions;
}

@JsonProperty
public DataSize getOutputDataSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ public class TestQueryStats
new DataSize(26, BYTE),
27,

new DataSize(30, BYTE),
29,

new DataSize(28, BYTE),
29,

Expand Down Expand Up @@ -306,6 +309,9 @@ static void assertExpectedQueryStats(QueryStats actual)
assertEquals(actual.getProcessedInputDataSize(), new DataSize(26, BYTE));
assertEquals(actual.getProcessedInputPositions(), 27);

assertEquals(actual.getShuffledDataSize(), new DataSize(30, BYTE));
assertEquals(actual.getShuffledPositions(), 29);

assertEquals(actual.getOutputDataSize(), new DataSize(28, BYTE));
assertEquals(actual.getOutputPositions(), 29);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public void testConstructor()
30,
DataSize.valueOf("31GB"),
32,
DataSize.valueOf("32GB"),
40,
33,
DataSize.valueOf("34GB"),
DataSize.valueOf("35GB"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ private QueryInfo createQueryInfo(String queryId, ResourceGroupId resourceGroupI
28,
DataSize.valueOf("29GB"),
30,
DataSize.valueOf("32GB"),
40,
DataSize.valueOf("31GB"),
32,
33,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
"rawInputPositions": 0,
"processedInputDataSize": "0B",
"processedInputPositions": 0,
"shuffledDataSize": "0B",
"shuffledPositions": 0,
"outputDataSize": "0B",
"outputPositions": 0,
"writtenOutputPositions": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@

import com.facebook.presto.Session;
import com.facebook.presto.common.type.Decimals;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.testng.annotations.Test;

import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_NULLS_IN_JOINS;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.NONE;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static com.facebook.presto.tests.QueryAssertions.assertContains;
Expand All @@ -33,11 +39,62 @@
import static com.facebook.presto.tests.QueryTemplate.queryTemplate;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.String.format;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;

public abstract class AbstractTestJoinQueries
extends AbstractTestQueryFramework
{
@Test
public void testShuffledStatsWithInnerJoin()
{
// NOTE: only test shuffled stats with distributed query runner and disk spilling is disabled.
if (!(getQueryRunner() instanceof DistributedQueryRunner) || getQueryRunner().getDefaultSession().getSystemProperty("spill_enabled", Boolean.class)) {
return;
}
DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();

// Get the number of rows in orders table for query stats verification below.
long ordersRows = getTableRowCount("orders");
// Get the number of rows in lineitem table for query stats verification below.
long lineitemRows = getTableRowCount("lineitem");

String query = "SELECT a.orderkey, a.orderstatus, b.linenumber FROM orders a JOIN lineitem b ON a.orderkey = b.orderkey";
// Set session property to enforce a hash partitioned join.
Session partitionedJoin = Session.builder(getSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, PARTITIONED.name())
.setSystemProperty(JOIN_REORDERING_STRATEGY, NONE.name())
.build();
QueryId partitionQueryId = queryRunner.executeWithQueryId(partitionedJoin, query).getQueryId();
QueryInfo partitionJoinQueryInfo = queryRunner.getQueryInfo(partitionQueryId);
long expectedRawInputRows = ordersRows + lineitemRows;
// Verify the number shuffled rows, raw input rows and output rows in hash partitioned join.
// NOTE: the latter two shall be the same for both hash partitioned join and broadcast join.
assertEquals(partitionJoinQueryInfo.getQueryStats().getRawInputPositions(), expectedRawInputRows);
long expectedOutputRows = lineitemRows;
assertEquals(partitionJoinQueryInfo.getQueryStats().getOutputPositions(), expectedOutputRows);
long expectedPartitionJoinShuffledRows = lineitemRows + ordersRows + expectedOutputRows;
assertEquals(partitionJoinQueryInfo.getQueryStats().getShuffledPositions(), expectedPartitionJoinShuffledRows);

// Set session property to enforce a broadcast join.
Session broadcastJoin = Session.builder(getSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name())
.setSystemProperty(JOIN_REORDERING_STRATEGY, NONE.name())
.build();

QueryId broadcastQueryId = queryRunner.executeWithQueryId(broadcastJoin, query).getQueryId();
assertNotEquals(partitionQueryId, broadcastQueryId);
QueryInfo broadcastJoinQueryInfo = queryRunner.getQueryInfo(broadcastQueryId);
assertEquals(broadcastJoinQueryInfo.getQueryStats().getRawInputPositions(), expectedRawInputRows);
assertEquals(broadcastJoinQueryInfo.getQueryStats().getOutputPositions(), expectedOutputRows);
// NOTE: the number of shuffled bytes except the final output should be a multiple of the number of rows in lineitem table in broadcast join case.
assertEquals(((broadcastJoinQueryInfo.getQueryStats().getShuffledPositions() - expectedOutputRows) % lineitemRows), 0);
assertTrue((broadcastJoinQueryInfo.getQueryStats().getShuffledPositions() - expectedOutputRows) >= lineitemRows);
// Both partitioned join and broadcast join should have the same raw input data size.
assertEquals(partitionJoinQueryInfo.getQueryStats().getRawInputDataSize().toBytes(), broadcastJoinQueryInfo.getQueryStats().getRawInputDataSize().toBytes());
assertNotEquals(partitionJoinQueryInfo.getQueryStats().getShuffledDataSize().toBytes(), broadcastJoinQueryInfo.getQueryStats().getShuffledDataSize().toBytes());
}

@Test
public void testRowFieldAccessorInJoin()
{
Expand Down Expand Up @@ -2459,4 +2516,13 @@ protected Session noJoinReordering()
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.PARTITIONED.name())
.build();
}

private long getTableRowCount(String tableName)
{
String countQuery = "SELECT COUNT(*) FROM " + tableName;
MaterializedRow countRow = Iterables.getOnlyElement(getQueryRunner().execute(countQuery));
int rowFieldCount = countRow.getFieldCount();
assertEquals(rowFieldCount, 1);
return (long) countRow.getField(0);
}
}

0 comments on commit 55e24cf

Please sign in to comment.