Skip to content

Commit

Permalink
Merge branch 'bqms_catalog' of https://github.com/ahmedabu98/beam int…
Browse files Browse the repository at this point in the history
…o bqms_catalog
  • Loading branch information
ahmedabu98 committed Jan 8, 2025
2 parents 84fddd9 + 1d56247 commit 1031172
Show file tree
Hide file tree
Showing 26 changed files with 708 additions and 82 deletions.
3 changes: 3 additions & 0 deletions .github/REVIEWERS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ labels:
reviewers:
- igorbernstein2
- mutianf
- djyau
- andre-sampaio
- meeral-k
exclusionList: []
- name: healthcare
reviewers:
Expand Down
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
16 changes: 9 additions & 7 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
Expand Down Expand Up @@ -160,6 +161,14 @@ def sickbayTests = [
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',


// These tests fail once Late Data was being precisely dropped.
// They set a single element to be late data, and expect it (correctly) to be preserved.
// Since presently, these are treated as No-ops, the fix is to disable the
// dropping behavior when a stage's input is a Reshuffle/Redistribute transform.
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',

// Prism isn't handling Java's side input views properly.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
Expand All @@ -177,13 +186,6 @@ def sickbayTests = [
// java.lang.IllegalStateException: java.io.EOFException
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',

// Requires Time Sorted Input
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',

// Missing output due to processing time timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',

Expand Down
12 changes: 12 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,18 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
func (ss *stageState) AddPending(newPending []element) int {
ss.mu.Lock()
defer ss.mu.Unlock()
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Data that arrives after the *output* watermark is late.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if e.window.MaxTimestamp() < threshold {
continue
}
origPending = append(origPending, e)
}
newPending = origPending
if ss.stateful {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
Expand Down
7 changes: 5 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb

// At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal.

// StatefulDoFns need to be marked as being roots.
// ForceRoots cause fusion breaks in the optimized graph.
// StatefulDoFns need to be marked as being roots, for correct per-key state handling.
// Prism already sorts input elements for a stage by EventTime, so a fusion break enables the sorted behavior.
var forcedRoots []string
if len(pdo.StateSpecs)+len(pdo.TimerFamilySpecs) > 0 {
if len(pdo.GetStateSpecs())+len(pdo.GetTimerFamilySpecs()) > 0 ||
pdo.GetRequiresTimeSortedInput() {
forcedRoots = append(forcedRoots, tid)
}

Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var supportedRequirements = map[string]struct{}{
urns.RequirementStatefulProcessing: {},
urns.RequirementBundleFinalization: {},
urns.RequirementOnWindowExpiration: {},
urns.RequirementTimeSortedInput: {},
}

// TODO, move back to main package, and key off of executor handlers?
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/transforms/stats/quantiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,10 @@ func reduce(s beam.Scope, weightedElements beam.PCollection, state approximateQu
// For example, if numQuantiles = 2, the returned list would contain a single element such that approximately half of the input would be less than that element and half would be greater or equal.
func ApproximateWeightedQuantiles(s beam.Scope, pc beam.PCollection, less any, opts Opts) beam.PCollection {
_, t := beam.ValidateKVType(pc)
// Return zero elements immediately if the requested number of quantiles is 1.
if opts.NumQuantiles == 1 {
return beam.Create(s, reflect.New(reflect.SliceOf(t.Type())).Elem().Interface())
}
state := approximateQuantilesCombineFnState{
K: opts.K,
NumQuantiles: opts.NumQuantiles,
Expand Down
17 changes: 17 additions & 0 deletions sdks/go/pkg/beam/transforms/stats/quantiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,20 @@ func TestWeightedElementEncoding(t *testing.T) {
t.Errorf("Invalid coder. Wanted %v got %v", w, decoded)
}
}

func TestZeroQuantiles(t *testing.T) {
const numElements int = 30000
inputSlice := make([]int, 0, numElements)
for i := 0; i < numElements; i++ {
inputSlice = append(inputSlice, i)
}
p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{{}})
quantiles := ApproximateQuantiles(s, input, less, Opts{
K: 200,
NumQuantiles: 1,
})
passert.Equals(s, quantiles, expected)
if err := ptest.Run(p); err != nil {
t.Errorf("ApproximateQuantiles failed: %v", err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3764,7 +3764,9 @@ public void testRequiresTimeSortedInputWithLateData() {
if (stamp == 100) {
// advance watermark when we have 100 remaining elements
// all the rest are going to be late elements
input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
input =
input.advanceWatermarkTo(
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
}
}
testTimeSortedInput(
Expand Down Expand Up @@ -3796,7 +3798,9 @@ public void testTwoRequiresTimeSortedInputWithLateData() {
if (stamp == 100) {
// advance watermark when we have 100 remaining elements
// all the rest are going to be late elements
input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
input =
input.advanceWatermarkTo(
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
}
}
// apply the sorted function for the first time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException, InterruptedException;

@Nullable
WriteStream getWriteStream(String writeStream);
TableSchema getWriteStreamSchema(String writeStream);

/**
* Create an append client for a given Storage API write stream. The stream must be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
Expand All @@ -86,6 +87,7 @@
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStreamView;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -1418,8 +1420,15 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
}

@Override
public @Nullable WriteStream getWriteStream(String writeStream) {
return newWriteClient.getWriteStream(writeStream);
public @Nullable TableSchema getWriteStreamSchema(String writeStream) {
@Nullable
WriteStream stream =
newWriteClient.getWriteStream(
GetWriteStreamRequest.newBuilder()
.setView(WriteStreamView.FULL)
.setName(writeStream)
.build());
return (stream != null && stream.hasTableSchema()) ? stream.getTableSchema() : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -475,15 +474,18 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u
() -> {
if (autoUpdateSchema) {
@Nullable
WriteStream writeStream =
TableSchema streamSchema =
Preconditions.checkStateNotNull(maybeWriteStreamService)
.getWriteStream(streamName);
if (writeStream != null && writeStream.hasTableSchema()) {
TableSchema updatedFromStream = writeStream.getTableSchema();
currentSchema.set(updatedFromStream);
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream);
.getWriteStreamSchema(streamName);
if (streamSchema != null) {
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(initialTableSchema, streamSchema);
if (newSchema.isPresent()) {
currentSchema.set(newSchema.get());
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get());
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,28 @@ public void process(
element.getKey().getKey(), dynamicDestinations, datasetService);
tableSchema = converter.getTableSchema();
descriptor = converter.getDescriptor(false);

if (autoUpdateSchema) {
// A StreamWriter ignores table schema updates that happen prior to its creation.
// So before creating a StreamWriter below, we fetch the table schema to check if we
// missed an update.
// If so, use the new schema instead of the base schema
@Nullable
TableSchema streamSchema =
MoreObjects.firstNonNull(
writeStreamService.getWriteStreamSchema(getOrCreateStream.get()),
TableSchema.getDefaultInstance());
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);

if (newSchema.isPresent()) {
tableSchema = newSchema.get();
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
updatedSchema.write(tableSchema);
}
}
}
AppendClientInfo info =
AppendClientInfo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,11 +590,11 @@ public WriteStream createWriteStream(String tableUrn, Type type) throws Interrup

@Override
@Nullable
public WriteStream getWriteStream(String streamName) {
public com.google.cloud.bigquery.storage.v1.TableSchema getWriteStreamSchema(String streamName) {
synchronized (FakeDatasetService.class) {
@Nullable Stream stream = writeStreams.get(streamName);
if (stream != null) {
return stream.toWriteStream();
return stream.toWriteStream().getTableSchema();
}
}
// TODO(relax): Return the exact error that BigQuery returns.
Expand Down
Loading

0 comments on commit 1031172

Please sign in to comment.