-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathbigquery.py
3099 lines (2719 loc) · 127 KB
/
bigquery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""BigQuery sources and sinks.
This module implements reading from and writing to BigQuery tables. It relies
on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema,
TableRow, and TableCell. The default mode is to return table rows read from a
BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink
accepts PCollections of dictionaries. This is done for more convenient
programming. If desired, the native TableRow objects can be used throughout to
represent rows (use an instance of TableRowJsonCoder as a coder argument when
creating the sources or sinks respectively).
Also, for programming convenience, instances of TableReference and TableSchema
have a string representation that can be used for the corresponding arguments:
- TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string.
- TableSchema can be a NAME:TYPE{,NAME:TYPE}* string
(e.g. 'month:STRING,event_count:INTEGER').
The syntax supported is described here:
https://cloud.google.com/bigquery/bq-command-line-tool-quickstart
BigQuery sources can be used as main inputs or side inputs. A main input
(common case) is expected to be massive and will be split into manageable chunks
and processed in parallel. Side inputs are expected to be small and will be read
completely every time a ParDo DoFn gets executed. In the example below the
lambda function implementing the DoFn for the Map transform will get on each
call *one* row of the main table and *all* rows of the side table. The runner
may use some caching techniques to share the side inputs between calls in order
to avoid excessive reading:::
main_table = pipeline | 'VeryBig' >> beam.io.ReadFromBigQuery(...)
side_table = pipeline | 'NotBig' >> beam.io.ReadFromBigQuery(...)
results = (
main_table
| 'ProcessData' >> beam.Map(
lambda element, side_input: ..., AsList(side_table)))
There is no difference in how main and side inputs are read. What makes the
side_table a 'side input' is the AsList wrapper used when passing the table
as a parameter to the Map transform. AsList signals to the execution framework
that its input should be made available whole.
The main and side inputs are implemented differently. Reading a BigQuery table
as main input entails exporting the table to a set of GCS files (in AVRO or in
JSON format) and then processing those files.
Users may provide a query to read from rather than reading all of a BigQuery
table. If specified, the result obtained by executing the specified query will
be used as the data of the input transform.::
query_results = pipeline | beam.io.gcp.bigquery.ReadFromBigQuery(
query='SELECT year, mean_temp FROM samples.weather_stations')
When creating a BigQuery input transform, users should provide either a query
or a table. Pipeline construction will fail with a validation error if neither
or both are specified.
When reading via `ReadFromBigQuery` using `EXPORT`,
bytes are returned decoded as bytes.
This is due to the fact that ReadFromBigQuery uses Avro exports by default.
When reading from BigQuery using `apache_beam.io.BigQuerySource`, bytes are
returned as base64-encoded bytes. To get base64-encoded bytes using
`ReadFromBigQuery`, you can use the flag `use_json_exports` to export
data as JSON, and receive base64-encoded bytes.
ReadAllFromBigQuery
-------------------
Beam 2.27.0 introduces a new transform called `ReadAllFromBigQuery` which
allows you to define table and query reads from BigQuery at pipeline
runtime.:::
read_requests = p | beam.Create([
ReadFromBigQueryRequest(query='SELECT * FROM mydataset.mytable'),
ReadFromBigQueryRequest(table='myproject.mydataset.mytable')])
results = read_requests | ReadAllFromBigQuery()
A good application for this transform is in streaming pipelines to
refresh a side input coming from BigQuery. This would work like so:::
side_input = (
p
| 'PeriodicImpulse' >> PeriodicImpulse(
first_timestamp, last_timestamp, interval, True)
| 'MapToReadRequest' >> beam.Map(
lambda x: ReadFromBigQueryRequest(table='dataset.table'))
| beam.io.ReadAllFromBigQuery())
main_input = (
p
| 'MpImpulse' >> beam.Create(sample_main_input_elements)
|
'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
| 'WindowMpInto' >> beam.WindowInto(
window.FixedWindows(main_input_windowing_interval)))
result = (
main_input
| 'ApplyCrossJoin' >> beam.FlatMap(
cross_join, rights=beam.pvalue.AsIter(side_input)))
**Note**: This transform is supported on Portable and Dataflow v2 runners.
**Note**: This transform does not currently clean up temporary datasets
created for its execution. (BEAM-11359)
Writing Data to BigQuery
========================
The `WriteToBigQuery` transform is the recommended way of writing data to
BigQuery. It supports a large set of parameters to customize how you'd like to
write to BigQuery.
Table References
----------------
This transform allows you to provide static `project`, `dataset` and `table`
parameters which point to a specific BigQuery table to be created. The `table`
parameter can also be a dynamic parameter (i.e. a callable), which receives an
element to be written to BigQuery, and returns the table that that element
should be sent to.
You may also provide a tuple of PCollectionView elements to be passed as side
inputs to your callable. For example, suppose that one wishes to send
events of different types to different tables, and the table names are
computed at pipeline runtime, one may do something like the following::
with Pipeline() as p:
elements = (p | 'Create elements' >> beam.Create([
{'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'},
{'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'},
]))
table_names = (p | 'Create table_names' >> beam.Create([
('error', 'my_project:dataset1.error_table_for_today'),
('user_log', 'my_project:dataset1.query_table_for_today'),
]))
table_names_dict = beam.pvalue.AsDict(table_names)
elements | beam.io.gcp.bigquery.WriteToBigQuery(
table=lambda row, table_dict: table_dict[row['type']],
table_side_inputs=(table_names_dict,))
In the example above, the `table_dict` argument passed to the function in
`table_dict` is the side input coming from `table_names_dict`, which is passed
as part of the `table_side_inputs` argument.
Schemas
---------
This transform also allows you to provide a static or dynamic `schema`
parameter (i.e. a callable).
If providing a callable, this should take in a table reference (as returned by
the `table` parameter), and return the corresponding schema for that table.
This allows to provide different schemas for different tables::
def compute_table_name(row):
...
errors_schema = {'fields': [
{'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
queries_schema = {'fields': [
{'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'query', 'type': 'STRING', 'mode': 'NULLABLE'}]}
with Pipeline() as p:
elements = (p | beam.Create([
{'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'},
{'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'},
]))
elements | beam.io.gcp.bigquery.WriteToBigQuery(
table=compute_table_name,
schema=lambda table: (errors_schema
if 'errors' in table
else queries_schema))
It may be the case that schemas are computed at pipeline runtime. In cases
like these, one can also provide a `schema_side_inputs` parameter, which is
a tuple of PCollectionViews to be passed to the schema callable (much like
the `table_side_inputs` parameter).
Additional Parameters for BigQuery Tables
-----------------------------------------
This sink is able to create tables in BigQuery if they don't already exist. It
also relies on creating temporary tables when performing file loads.
The WriteToBigQuery transform creates tables using the BigQuery API by
inserting a load job (see the API reference [1]), or by inserting a new table
(see the API reference for that [2][3]).
When creating a new BigQuery table, there are a number of extra parameters
that one may need to specify. For example, clustering, partitioning, data
encoding, etc. It is possible to provide these additional parameters by
passing a Python dictionary as `additional_bq_parameters` to the transform.
As an example, to create a table that has specific partitioning, and
clustering properties, one would do the following::
additional_bq_parameters = {
'timePartitioning': {'type': 'DAY'},
'clustering': {'fields': ['country']}}
with Pipeline() as p:
elements = (p | beam.Create([
{'country': 'mexico', 'timestamp': '12:34:56', 'query': 'acapulco'},
{'country': 'canada', 'timestamp': '12:34:59', 'query': 'influenza'},
]))
elements | beam.io.gcp.bigquery.WriteToBigQuery(
table='project_name1:dataset_2.query_events_table',
additional_bq_parameters=additional_bq_parameters)
Much like the schema case, the parameter with `additional_bq_parameters` can
also take a callable that receives a table reference.
[1] https://cloud.google.com/bigquery/docs/reference/rest/v2/Job\
#jobconfigurationload
[2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
[3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
Chaining of operations after WriteToBigQuery
--------------------------------------------
WritToBigQuery returns an object with several PCollections that consist of
metadata about the write operations. These are useful to inspect the write
operation and follow with the results::
schema = {'fields': [
{'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}
error_schema = {'fields': [
{'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
with Pipeline() as p:
result = (p
| 'Create Columns' >> beam.Create([
{'column': 'value'},
{'bad_column': 'bad_value'}
])
| 'Write Data' >> WriteToBigQuery(
method=WriteToBigQuery.Method.STREAMING_INSERTS,
table=my_table,
schema=schema,
insert_retry_strategy=RetryStrategy.RETRY_NEVER
))
_ = (result.failed_rows_with_errors
| 'Get Errors' >> beam.Map(lambda e: {
"destination": e[0],
"row": json.dumps(e[1]),
"error_message": e[2][0]['message']
})
| 'Write Errors' >> WriteToBigQuery(
method=WriteToBigQuery.Method.STREAMING_INSERTS,
table=error_log_table,
schema=error_schema,
))
Often, the simplest use case is to chain an operation after writing data to
BigQuery.To do this, one can chain the operation after one of the output
PCollections. A generic way in which this operation (independent of write
method) could look like::
def chain_after(result):
try:
# This works for FILE_LOADS, where we run load and possibly copy jobs.
return (result.destination_load_jobid_pairs,
result.destination_copy_jobid_pairs) | beam.Flatten()
except AttributeError:
# Works for STREAMING_INSERTS, where we return the rows BigQuery rejected
return result.failed_rows
result = (pcoll | WriteToBigQuery(...))
_ = (chain_after(result)
| beam.Reshuffle() # Force a 'commit' of the intermediate date
| MyOperationAfterWriteToBQ())
Attributes can be accessed using dot notation or bracket notation:
result.failed_rows <--> result['FailedRows']
result.failed_rows_with_errors <--> result['FailedRowsWithErrors']
result.destination_load_jobid_pairs <--> result['destination_load_jobid_pairs']
result.destination_file_pairs <--> result['destination_file_pairs']
result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
Writing with Storage Write API using Cross Language
---------------------------------------------------
This sink is able to write with BigQuery's Storage Write API. To do so, specify
the method `WriteToBigQuery.Method.STORAGE_WRITE_API`. This will use the
StorageWriteToBigQuery() transform to discover and use the Java implementation.
Using this transform directly will require the use of beam.Row() elements.
Similar to streaming inserts, it returns two dead-letter queue PCollections:
one containing just the failed rows and the other containing failed rows and
errors. They can be accessed with `failed_rows` and `failed_rows_with_errors`,
respectively. See the examples above for how to do this.
*** Short introduction to BigQuery concepts ***
Tables have rows (TableRow) and each row has cells (TableCell).
A table has a schema (TableSchema), which in turn describes the schema of each
cell (TableFieldSchema). The terms field and cell are used interchangeably.
TableSchema: Describes the schema (types and order) for values in each row.
Has one attribute, 'field', which is list of TableFieldSchema objects.
TableFieldSchema: Describes the schema (type, name) for one field.
Has several attributes, including 'name' and 'type'. Common values for
the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'NUMERIC',
'GEOGRAPHY'.
All possible values are described at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
TableRow: Holds all values in a table row. Has one attribute, 'f', which is a
list of TableCell instances.
TableCell: Holds the value for one cell (or field). Has one attribute,
'v', which is a JsonValue instance. This class is defined in
apitools.base.py.extra_types.py module.
As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports
high-precision decimal numbers (precision of 38 digits, scale of 9 digits).
The GEOGRAPHY data type works with Well-Known Text (See
https://en.wikipedia.org/wiki/Well-known_text) format for reading and writing
to BigQuery.
BigQuery IO requires values of BYTES datatype to be encoded using base64
encoding when writing to BigQuery.
**Updates to the I/O connector code**
For any significant updates to this I/O connector, please consider involving
corresponding code reviewers mentioned in
https://github.com/apache/beam/blob/master/sdks/python/OWNERS
"""
# pytype: skip-file
import collections
import io
import itertools
import json
import logging
import random
import secrets
import time
import uuid
import warnings
from dataclasses import dataclass
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import fastavro
from objsize import get_deep_size
import apache_beam as beam
from apache_beam import coders
from apache_beam import pvalue
from apache_beam.internal.gcp.json_value import from_json_value
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io import range_trackers
from apache_beam.io.avroio import _create_avro_source as create_avro_source
from apache_beam.io.filesystems import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery_schema_tools
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit
from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder
from apache_beam.io.gcp.bigquery_read_internal import _PassThroughThenCleanup
from apache_beam.io.gcp.bigquery_read_internal import _PassThroughThenCleanupTempDatasets
from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.iobase import RangeTracker
from apache_beam.io.iobase import SDFBoundedSourceReader
from apache_beam.io.iobase import SourceBundle
from apache_beam.io.textio import _TextSource as TextSource
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import ValueProvider
from apache_beam.options.value_provider import check_accessible
from apache_beam.pvalue import PCollection
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external import SchemaAwareExternalTransform
from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX
from apache_beam.transforms.sideinputs import get_sideinput_index
from apache_beam.transforms.util import ReshufflePerKey
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.typehints.schemas import schema_from_element_type
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
try:
from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
from apache_beam.io.gcp.internal.clients.bigquery import JobReference
except ImportError:
DatasetReference = None
TableReference = None
JobReference = None
_LOGGER = logging.getLogger(__name__)
try:
import google.cloud.bigquery_storage_v1 as bq_storage
except ImportError:
_LOGGER.info(
'No module named google.cloud.bigquery_storage_v1. '
'As a result, the ReadFromBigQuery transform *CANNOT* be '
'used with `method=DIRECT_READ`.')
__all__ = [
'TableRowJsonCoder',
'BigQueryDisposition',
'BigQuerySource',
'BigQuerySink',
'BigQueryQueryPriority',
'WriteToBigQuery',
'WriteResult',
'ReadFromBigQuery',
'ReadFromBigQueryRequest',
'ReadAllFromBigQuery',
'SCHEMA_AUTODETECT',
]
"""
Template for BigQuery jobs created by BigQueryIO. This template is:
`"beam_bq_job_{job_type}_{job_id}_{step_id}_{random}"`, where:
- `job_type` represents the BigQuery job type (e.g. extract / copy / load /
query).
- `job_id` is the Beam job name.
- `step_id` is a UUID representing the Dataflow step that created the
BQ job.
- `random` is a random string.
NOTE: This job name template does not have backwards compatibility guarantees.
"""
BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}"
"""
The maximum number of times that a bundle of rows that errors out should be
sent for insertion into BigQuery.
The default is 10,000 with exponential backoffs, so a bundle of rows may be
tried for a very long time. You may reduce this property to reduce the number
of retries.
"""
MAX_INSERT_RETRIES = 10000
"""
The maximum byte size for a BigQuery legacy streaming insert payload.
Note: The actual limit is 10MB, but we set it to 9MB to make room for request
overhead: https://cloud.google.com/bigquery/quotas#streaming_inserts
"""
MAX_INSERT_PAYLOAD_SIZE = 9 << 20
@deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
def _parse_table_reference(table, dataset=None, project=None):
return bigquery_tools.parse_table_reference(table, dataset, project)
@deprecated(
since='2.11.0', current="bigquery_tools.parse_table_schema_from_json")
def parse_table_schema_from_json(schema_string):
return bigquery_tools.parse_table_schema_from_json(schema_string)
@deprecated(since='2.11.0', current="bigquery_tools.default_encoder")
def default_encoder(obj):
return bigquery_tools.default_encoder(obj)
@deprecated(since='2.11.0', current="bigquery_tools.RowAsDictJsonCoder")
def RowAsDictJsonCoder(*args, **kwargs):
return bigquery_tools.RowAsDictJsonCoder(*args, **kwargs)
@deprecated(since='2.11.0', current="bigquery_tools.BigQueryWrapper")
def BigQueryWrapper(*args, **kwargs):
return bigquery_tools.BigQueryWrapper(*args, **kwargs)
class TableRowJsonCoder(coders.Coder):
"""A coder for a TableRow instance to/from a JSON string.
Note that the encoding operation (used when writing to sinks) requires the
table schema in order to obtain the ordered list of field names. Reading from
sources on the other hand does not need the table schema.
"""
def __init__(self, table_schema=None):
# The table schema is needed for encoding TableRows as JSON (writing to
# sinks) because the ordered list of field names is used in the JSON
# representation.
self.table_schema = table_schema
# Precompute field names since we need them for row encoding.
if self.table_schema:
self.field_names = tuple(fs.name for fs in self.table_schema.fields)
self.field_types = tuple(fs.type for fs in self.table_schema.fields)
def encode(self, table_row):
if self.table_schema is None:
raise AttributeError(
'The TableRowJsonCoder requires a table schema for '
'encoding operations. Please specify a table_schema argument.')
try:
return json.dumps(
collections.OrderedDict(
zip(
self.field_names,
[from_json_value(f.v) for f in table_row.f])),
allow_nan=False,
default=bigquery_tools.default_encoder)
except ValueError as e:
raise ValueError('%s. %s' % (e, bigquery_tools.JSON_COMPLIANCE_ERROR))
def decode(self, encoded_table_row):
od = json.loads(
encoded_table_row, object_pairs_hook=collections.OrderedDict)
return bigquery.TableRow(
f=[bigquery.TableCell(v=to_json_value(e)) for e in od.values()])
class BigQueryDisposition(object):
"""Class holding standard strings used for create and write dispositions."""
CREATE_NEVER = 'CREATE_NEVER'
CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
WRITE_TRUNCATE = 'WRITE_TRUNCATE'
WRITE_APPEND = 'WRITE_APPEND'
WRITE_EMPTY = 'WRITE_EMPTY'
@staticmethod
def validate_create(disposition):
values = (
BigQueryDisposition.CREATE_NEVER, BigQueryDisposition.CREATE_IF_NEEDED)
if disposition not in values:
raise ValueError(
'Invalid create disposition %s. Expecting %s' % (disposition, values))
return disposition
@staticmethod
def validate_write(disposition):
values = (
BigQueryDisposition.WRITE_TRUNCATE,
BigQueryDisposition.WRITE_APPEND,
BigQueryDisposition.WRITE_EMPTY)
if disposition not in values:
raise ValueError(
'Invalid write disposition %s. Expecting %s' % (disposition, values))
return disposition
class BigQueryQueryPriority(object):
"""Class holding standard strings used for query priority."""
INTERACTIVE = 'INTERACTIVE'
BATCH = 'BATCH'
# -----------------------------------------------------------------------------
# BigQuerySource, BigQuerySink.
@deprecated(since='2.25.0', current="ReadFromBigQuery")
def BigQuerySource(
table=None,
dataset=None,
project=None,
query=None,
validate=False,
coder=None,
use_standard_sql=False,
flatten_results=True,
kms_key=None,
use_dataflow_native_source=False):
if use_dataflow_native_source:
warnings.warn(
"Native sources no longer implemented; "
"falling back to standard Beam source.")
return ReadFromBigQuery(
table=table,
dataset=dataset,
project=project,
query=query,
validate=validate,
coder=coder,
use_standard_sql=use_standard_sql,
flatten_results=flatten_results,
use_json_exports=True,
kms_key=kms_key)
@deprecated(since='2.25.0', current="ReadFromBigQuery")
def _BigQuerySource(*args, **kwargs):
"""A source based on a BigQuery table."""
warnings.warn(
"Native sources no longer implemented; "
"falling back to standard Beam source.")
return ReadFromBigQuery(*args, **kwargs)
# TODO(https://github.com/apache/beam/issues/21622): remove the serialization
# restriction in transform implementation once InteractiveRunner can work
# without runner api roundtrips.
@dataclass
class _BigQueryExportResult:
coder: beam.coders.Coder
paths: List[str]
class _CustomBigQuerySource(BoundedSource):
def __init__(
self,
method,
gcs_location=None,
table=None,
dataset=None,
project=None,
query=None,
validate=False,
pipeline_options=None,
coder=None,
use_standard_sql=False,
flatten_results=True,
kms_key=None,
bigquery_job_labels=None,
use_json_exports=False,
job_name=None,
step_name=None,
unique_id=None,
temp_dataset=None,
query_priority=BigQueryQueryPriority.BATCH):
if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
' Please specify only one of these.')
elif table is None and query is None:
raise ValueError('A BigQuery table or a query must be specified')
elif table is not None:
self.table_reference = bigquery_tools.parse_table_reference(
table, dataset, project)
self.query = None
self.use_legacy_sql = True
else:
if isinstance(query, str):
query = StaticValueProvider(str, query)
self.query = query
# TODO(BEAM-1082): Change the internal flag to be standard_sql
self.use_legacy_sql = not use_standard_sql
self.table_reference = None
self.method = method
self.gcs_location = gcs_location
self.project = project
self.validate = validate
self.flatten_results = flatten_results
self.coder = coder or _JsonToDictCoder
self.kms_key = kms_key
self.export_result = None
self.options = pipeline_options
self.bq_io_metadata = None # Populate in setup, as it may make an RPC
self.bigquery_job_labels = bigquery_job_labels or {}
self.use_json_exports = use_json_exports
self.temp_dataset = temp_dataset
self.query_priority = query_priority
self._job_name = job_name or 'BQ_EXPORT_JOB'
self._step_name = step_name
self._source_uuid = unique_id
def _get_bq_metadata(self):
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
return self.bq_io_metadata
def display_data(self):
export_format = 'JSON' if self.use_json_exports else 'AVRO'
return {
'method': str(self.method),
'table': str(self.table_reference),
'query': str(self.query),
'project': str(self.project),
'use_legacy_sql': self.use_legacy_sql,
'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
'export_file_format': export_format,
'launchesBigQueryJobs': DisplayDataItem(
True, label="This Dataflow job launches bigquery jobs."),
}
def estimate_size(self):
bq = bigquery_tools.BigQueryWrapper.from_pipeline_options(self.options)
if self.table_reference is not None:
table_ref = self.table_reference
if (isinstance(self.table_reference, vp.ValueProvider) and
self.table_reference.is_accessible()):
table_ref = bigquery_tools.parse_table_reference(
self.table_reference.get(), project=self._get_project())
elif isinstance(self.table_reference, vp.ValueProvider):
# Size estimation is best effort. We return None as we have
# no access to the table that we're querying.
return None
if not table_ref.projectId:
table_ref.projectId = self._get_project()
table = bq.get_table(
table_ref.projectId, table_ref.datasetId, table_ref.tableId)
return int(table.numBytes)
elif self.query is not None and self.query.is_accessible():
project = self._get_project()
query_job_name = bigquery_tools.generate_bq_job_name(
self._job_name,
self._source_uuid,
bigquery_tools.BigQueryJobTypes.QUERY,
'%s_%s' % (int(time.time()), random.randint(0, 1000)))
job = bq._start_query_job(
project,
self.query.get(),
self.use_legacy_sql,
self.flatten_results,
job_id=query_job_name,
priority=self.query_priority,
dry_run=True,
kms_key=self.kms_key,
job_labels=self._get_bq_metadata().add_additional_bq_job_labels(
self.bigquery_job_labels))
if job.statistics.totalBytesProcessed is None:
# Some queries may not have access to `totalBytesProcessed` as a
# result of row-level security.
# > BigQuery hides sensitive statistics on all queries against
# > tables with row-level security.
# See cloud.google.com/bigquery/docs/managing-row-level-security
# and cloud.google.com/bigquery/docs/best-practices-row-level-security
return None
return int(job.statistics.totalBytesProcessed)
else:
# Size estimation is best effort. We return None as we have
# no access to the query that we're running.
return None
def _get_project(self):
"""Returns the project that queries and exports will be billed to."""
project = self.options.view_as(GoogleCloudOptions).project
if isinstance(project, vp.ValueProvider):
project = project.get()
if self.temp_dataset:
return self.temp_dataset.projectId
if not project:
project = self.project
return project
def _create_source(self, path, coder):
if not self.use_json_exports:
return create_avro_source(path)
else:
return TextSource(
path,
min_bundle_size=0,
compression_type=CompressionTypes.UNCOMPRESSED,
strip_trailing_newlines=True,
coder=coder)
def split(self, desired_bundle_size, start_position=None, stop_position=None):
if self.export_result is None:
bq = bigquery_tools.BigQueryWrapper(
temp_dataset_id=(
self.temp_dataset.datasetId if self.temp_dataset else None),
client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options))
if self.query is not None:
self._setup_temporary_dataset(bq)
self.table_reference = self._execute_query(bq)
if isinstance(self.table_reference, vp.ValueProvider):
self.table_reference = bigquery_tools.parse_table_reference(
self.table_reference.get(), project=self._get_project())
elif not self.table_reference.projectId:
self.table_reference.projectId = self._get_project()
Lineage.sources().add(
'bigquery',
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
schema, metadata_list = self._export_files(bq)
self.export_result = _BigQueryExportResult(
coder=self.coder(schema),
paths=[metadata.path for metadata in metadata_list])
if self.query is not None:
bq.clean_up_temporary_dataset(self._get_project())
for path in self.export_result.paths:
source = self._create_source(path, self.export_result.coder)
yield SourceBundle(
weight=1.0, source=source, start_position=None, stop_position=None)
def get_range_tracker(self, start_position, stop_position):
class CustomBigQuerySourceRangeTracker(RangeTracker):
"""A RangeTracker that always returns positions as None."""
def start_position(self):
return None
def stop_position(self):
return None
return CustomBigQuerySourceRangeTracker()
def read(self, range_tracker):
raise NotImplementedError('BigQuery source must be split before being read')
@check_accessible(['query'])
def _setup_temporary_dataset(self, bq):
if self.temp_dataset:
# Temp dataset was provided by the user so we can just return.
return
location = bq.get_query_location(
self._get_project(), self.query.get(), self.use_legacy_sql)
bq.create_temporary_dataset(self._get_project(), location)
@check_accessible(['query'])
def _execute_query(self, bq):
query_job_name = bigquery_tools.generate_bq_job_name(
self._job_name,
self._source_uuid,
bigquery_tools.BigQueryJobTypes.QUERY,
'%s_%s' % (int(time.time()), random.randint(0, 1000)))
job = bq._start_query_job(
self._get_project(),
self.query.get(),
self.use_legacy_sql,
self.flatten_results,
job_id=query_job_name,
priority=self.query_priority,
kms_key=self.kms_key,
job_labels=self._get_bq_metadata().add_additional_bq_job_labels(
self.bigquery_job_labels))
job_ref = job.jobReference
bq.wait_for_bq_job(job_ref, max_retries=0)
return bq._get_temp_table(self._get_project())
def _export_files(self, bq):
"""Runs a BigQuery export job.
Returns:
bigquery.TableSchema instance, a list of FileMetadata instances
"""
job_labels = self._get_bq_metadata().add_additional_bq_job_labels(
self.bigquery_job_labels)
export_job_name = bigquery_tools.generate_bq_job_name(
self._job_name,
self._source_uuid,
bigquery_tools.BigQueryJobTypes.EXPORT,
'%s_%s' % (int(time.time()), random.randint(0, 1000)))
temp_location = self.options.view_as(GoogleCloudOptions).temp_location
gcs_location = bigquery_export_destination_uri(
self.gcs_location, temp_location, self._source_uuid)
try:
if self.use_json_exports:
job_ref = bq.perform_extract_job([gcs_location],
export_job_name,
self.table_reference,
bigquery_tools.FileFormat.JSON,
project=self._get_project(),
job_labels=job_labels,
include_header=False)
else:
job_ref = bq.perform_extract_job([gcs_location],
export_job_name,
self.table_reference,
bigquery_tools.FileFormat.AVRO,
project=self._get_project(),
include_header=False,
job_labels=job_labels,
use_avro_logical_types=True)
bq.wait_for_bq_job(job_ref)
except Exception as exn: # pylint: disable=broad-except
# The error messages thrown in this case are generic and misleading,
# so leave this breadcrumb in case it's the root cause.
logging.warning(
"Error exporting table: %s. "
"Note that external tables cannot be exported: "
"https://cloud.google.com/bigquery/docs/external-tables"
"#external_table_limitations",
exn)
raise
metadata_list = FileSystems.match([gcs_location])[0].metadata_list
if isinstance(self.table_reference, vp.ValueProvider):
table_ref = bigquery_tools.parse_table_reference(
self.table_reference.get(), project=self.project)
else:
table_ref = self.table_reference
table = bq.get_table(
table_ref.projectId, table_ref.datasetId, table_ref.tableId)
return table.schema, metadata_list
class _CustomBigQueryStorageSource(BoundedSource):
"""A base class for BoundedSource implementations which read from BigQuery
using the BigQuery Storage API.
Args:
table (str, TableReference): The ID of the table. If **dataset** argument is
:data:`None` then the table argument must contain the entire table
reference specified as: ``'PROJECT:DATASET.TABLE'`` or must specify a
TableReference.
dataset (str): Optional ID of the dataset containing this table or
:data:`None` if the table argument specifies a TableReference.
project (str): Optional ID of the project containing this table or
:data:`None` if the table argument specifies a TableReference.
selected_fields (List[str]): Optional List of names of the fields in the
table that should be read. If empty, all fields will be read. If the
specified field is a nested field, all the sub-fields in the field will be
selected. The output field order is unrelated to the order of fields in
selected_fields.
row_restriction (str): Optional SQL text filtering statement, similar to a
WHERE clause in a query. Aggregates are not supported. Restricted to a
maximum length for 1 MB.
use_native_datetime (bool): If :data:`True`, BigQuery DATETIME fields will
be returned as native Python datetime objects. If :data:`False`,
DATETIME fields will be returned as formatted strings (for example:
2021-01-01T12:59:59). The default is :data:`False`.
"""
# The maximum number of streams which will be requested when creating a read
# session, regardless of the desired bundle size.
MAX_SPLIT_COUNT = 10000
# The minimum number of streams which will be requested when creating a read
# session, regardless of the desired bundle size. Note that the server may
# still choose to return fewer than ten streams based on the layout of the
# table.
MIN_SPLIT_COUNT = 10
def __init__(
self,
method: str,
query_priority: [BigQueryQueryPriority] = BigQueryQueryPriority.BATCH,
table: Optional[Union[str, TableReference]] = None,
dataset: Optional[str] = None,
project: Optional[str] = None,
query: Optional[str] = None,
selected_fields: Optional[List[str]] = None,
row_restriction: Optional[str] = None,
pipeline_options: Optional[GoogleCloudOptions] = None,
unique_id: Optional[uuid.UUID] = None,
bigquery_job_labels: Optional[Dict] = None,
bigquery_dataset_labels: Optional[Dict] = None,
job_name: Optional[str] = None,
step_name: Optional[str] = None,
use_standard_sql: Optional[bool] = False,
flatten_results: Optional[bool] = True,
kms_key: Optional[str] = None,
temp_dataset: Optional[DatasetReference] = None,
temp_table: Optional[TableReference] = None,
use_native_datetime: Optional[bool] = False):
if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
' Please specify only one of these.')
elif table is None and query is None:
raise ValueError('A BigQuery table or a query must be specified')
elif table is not None:
self.table_reference = bigquery_tools.parse_table_reference(
table, dataset, project)
self.query = None