From 2c075d3955ef27195fab33c40316b9ae4fa66b16 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Thu, 20 Apr 2023 14:59:26 +0530 Subject: [PATCH 1/6] fix: rst retry for txn --- google/cloud/spanner_v1/_helpers.py | 50 ++++++++++++++++++++++++++ google/cloud/spanner_v1/batch.py | 11 +++++- google/cloud/spanner_v1/snapshot.py | 23 ++++++++++-- google/cloud/spanner_v1/transaction.py | 34 +++++++++++++++--- 4 files changed, 109 insertions(+), 9 deletions(-) diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index b364514d09..83737313fe 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -17,6 +17,7 @@ import datetime import decimal import math +import time from google.protobuf.struct_pb2 import ListValue from google.protobuf.struct_pb2 import Value @@ -292,3 +293,52 @@ def _metadata_with_prefix(prefix, **kw): List[Tuple[str, str]]: RPC metadata with supplied prefix """ return [("google-cloud-resource-prefix", prefix)] + + +def _retry( + func, + retry_count=5, + delay=2, + allowed_exceptions={ + Exception: None, + }, +): + """ + Retry a function with a specified number of retries, delay between retries, and list of allowed exceptions. + + Args: + func: The function to be retried. + retry_count: The maximum number of times to retry the function. + delay: The delay in seconds between retries. + allowed_exceptions: A tuple of exceptions that are allowed to occur without triggering a retry. + + Returns: + The result of the function if it is successful, or raises the last exception if all retries fail. + """ + for retries in range(retry_count): + try: + result = func() + except Exception as exc: + if exc in allowed_exceptions and retries < retry_count: + if allowed_exceptions[exc] is not None: + allowed_exceptions[exc](exc) + time.sleep(delay) + delay = delay * 2 + else: + raise exc + else: + return result + + +def _check_rst_stream_error(exc): + resumable_error = ( + any( + resumable_message in exc.message + for resumable_message in ( + "RST_STREAM", + "Received unexpected EOS on DATA frame from server", + ) + ), + ) + if not resumable_error: + raise exc diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 48c533d2cd..195b6fb2a2 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -13,6 +13,7 @@ # limitations under the License. """Context manager for Cloud Spanner batched writes.""" +import functools from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import Mutation @@ -23,6 +24,9 @@ from google.cloud.spanner_v1._helpers import _metadata_with_prefix from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1._helpers import _retry +from google.cloud.spanner_v1._helpers import _check_rst_stream_error +from google.api_core.exceptions import InternalServerError class _BatchBase(_SessionWrapper): @@ -179,10 +183,15 @@ def commit(self, return_commit_stats=False, request_options=None): request_options=request_options, ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): - response = api.commit( + method = functools.partial( + api.api.commit, request=request, metadata=metadata, ) + response = _retry( + method, + allowed_exceptions={InternalServerError: _check_rst_stream_error}, + ) self.committed = response.commit_timestamp self.commit_stats = response.commit_stats return self.committed diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 362e5dd1bc..43fc63a1e8 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -33,6 +33,8 @@ from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import _metadata_with_prefix from google.cloud.spanner_v1._helpers import _SessionWrapper +from google.cloud.spanner_v1._helpers import _retry +from google.cloud.spanner_v1._helpers import _check_rst_stream_error from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1.streamed import StreamedResultSet from google.cloud.spanner_v1 import RequestOptions @@ -545,12 +547,17 @@ def partition_read( with trace_call( "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes ): - response = api.partition_read( + method = functools.partial( + api.partition_read, request=request, metadata=metadata, retry=retry, timeout=timeout, ) + response = _retry( + method, + allowed_exceptions={InternalServerError: _check_rst_stream_error}, + ) return [partition.partition_token for partition in response.partitions] @@ -640,12 +647,17 @@ def partition_query( self._session, trace_attributes, ): - response = api.partition_query( + method = functools.partial( + api.partition_query, request=request, metadata=metadata, retry=retry, timeout=timeout, ) + response = _retry( + method, + allowed_exceptions={InternalServerError: _check_rst_stream_error}, + ) return [partition.partition_token for partition in response.partitions] @@ -768,10 +780,15 @@ def begin(self): metadata = _metadata_with_prefix(database.name) txn_selector = self._make_txn_selector() with trace_call("CloudSpanner.BeginTransaction", self._session): - response = api.begin_transaction( + method = functools.partial( + api.begin_transaction, session=self._session.name, options=txn_selector.begin, metadata=metadata, ) + response = _retry( + method, + allowed_exceptions={InternalServerError: _check_rst_stream_error}, + ) self._transaction_id = response.id return self._transaction_id diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index ce34054ab9..e490154e8f 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -32,6 +32,9 @@ from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1 import RequestOptions from google.api_core import gapic_v1 +from google.cloud.spanner_v1._helpers import _retry +from google.cloud.spanner_v1._helpers import _check_rst_stream_error +from google.api_core.exceptions import InternalServerError class Transaction(_SnapshotBase, _BatchBase): @@ -100,7 +103,11 @@ def _execute_request( transaction = self._make_txn_selector() request.transaction = transaction with trace_call(trace_name, session, attributes): - response = method(request=request) + method = functools.partial(method, request=request) + response = _retry( + method, + allowed_exceptions={InternalServerError: _check_rst_stream_error}, + ) return response @@ -126,8 +133,15 @@ def begin(self): metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) with trace_call("CloudSpanner.BeginTransaction", self._session): - response = api.begin_transaction( - session=self._session.name, options=txn_options, metadata=metadata + method = functools.partial( + api.begin_transaction, + session=self._session.name, + options=txn_options, + metadata=metadata, + ) + response = _retry( + method, + allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) self._transaction_id = response.id return self._transaction_id @@ -141,11 +155,16 @@ def rollback(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) with trace_call("CloudSpanner.Rollback", self._session): - api.rollback( + method = functools.partial( + api.rollback, session=self._session.name, transaction_id=self._transaction_id, metadata=metadata, ) + _retry( + method, + allowed_exceptions={InternalServerError: _check_rst_stream_error}, + ) self.rolled_back = True del self._session._transaction @@ -196,10 +215,15 @@ def commit(self, return_commit_stats=False, request_options=None): request_options=request_options, ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): - response = api.commit( + method = functools.partial( + api.commit, request=request, metadata=metadata, ) + response = _retry( + method, + allowed_exceptions={InternalServerError: _check_rst_stream_error}, + ) self.committed = response.commit_timestamp if return_commit_stats: self.commit_stats = response.commit_stats From 72d18aea97d856dbfa9d3132b912ec48696c504f Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Thu, 27 Apr 2023 12:24:59 +0530 Subject: [PATCH 2/6] rst changes and tests --- google/cloud/spanner_v1/_helpers.py | 21 ++++---- google/cloud/spanner_v1/batch.py | 2 +- tests/unit/test__helpers.py | 74 +++++++++++++++++++++++++++++ tests/unit/test_snapshot.py | 19 ++++++++ tests/unit/test_transaction.py | 19 ++++++++ 5 files changed, 126 insertions(+), 9 deletions(-) diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index 83737313fe..d4d91131ab 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -299,9 +299,7 @@ def _retry( func, retry_count=5, delay=2, - allowed_exceptions={ - Exception: None, - }, + allowed_exceptions=None, ): """ Retry a function with a specified number of retries, delay between retries, and list of allowed exceptions. @@ -315,15 +313,22 @@ def _retry( Returns: The result of the function if it is successful, or raises the last exception if all retries fail. """ - for retries in range(retry_count): + retries = 0 + while retries <= retry_count: try: result = func() except Exception as exc: - if exc in allowed_exceptions and retries < retry_count: - if allowed_exceptions[exc] is not None: - allowed_exceptions[exc](exc) + if ( + allowed_exceptions is None or exc.__class__ in allowed_exceptions + ) and retries < retry_count: + if ( + allowed_exceptions is not None + and allowed_exceptions[exc.__class__] is not None + ): + allowed_exceptions[exc.__class__](exc) time.sleep(delay) delay = delay * 2 + retries = retries + 1 else: raise exc else: @@ -341,4 +346,4 @@ def _check_rst_stream_error(exc): ), ) if not resumable_error: - raise exc + raise diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 195b6fb2a2..ac62271eec 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -184,7 +184,7 @@ def commit(self, return_commit_stats=False, request_options=None): ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): method = functools.partial( - api.api.commit, + api.commit, request=request, metadata=metadata, ) diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 21434da191..6944852977 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -14,6 +14,7 @@ import unittest +import mock class Test_merge_query_options(unittest.TestCase): @@ -669,3 +670,76 @@ def test(self): prefix = "prefix" metadata = self._call_fut(prefix) self.assertEqual(metadata, [("google-cloud-resource-prefix", prefix)]) + + +class Test_retry(unittest.TestCase): + class test_class: + def test_fxn(self): + return True + + def test_retry_on_error(self): + from google.api_core.exceptions import InternalServerError, NotFound + from google.cloud.spanner_v1._helpers import _retry + import functools + + test_api = mock.create_autospec(self.test_class) + test_api.test_fxn.side_effect = [ + InternalServerError("testing"), + NotFound("testing"), + True, + ] + + _retry(functools.partial(test_api.test_fxn)) + + self.assertEqual(test_api.test_fxn.call_count, 3) + + def test_retry_allowed_exceptions(self): + from google.api_core.exceptions import InternalServerError, NotFound + from google.cloud.spanner_v1._helpers import _retry + import functools + + test_api = mock.create_autospec(self.test_class) + test_api.test_fxn.side_effect = [ + InternalServerError("testing"), + InternalServerError("testing"), + True, + ] + + with self.assertRaises(InternalServerError): + _retry( + functools.partial(test_api.test_fxn), + allowed_exceptions={NotFound: None}, + ) + + def test_retry_count(self): + from google.api_core.exceptions import InternalServerError + from google.cloud.spanner_v1._helpers import _retry + import functools + + test_api = mock.create_autospec(self.test_class) + test_api.test_fxn.side_effect = [ + InternalServerError("testing"), + InternalServerError("testing"), + ] + + with self.assertRaises(InternalServerError): + _retry(functools.partial(test_api.test_fxn), retry_count=1) + + def test_check_rst_stream_error(self): + from google.api_core.exceptions import InternalServerError + from google.cloud.spanner_v1._helpers import _retry, _check_rst_stream_error + import functools + + test_api = mock.create_autospec(self.test_class) + test_api.test_fxn.side_effect = [ + InternalServerError("Received unexpected EOS on DATA frame from server"), + InternalServerError("RST_STREAM"), + True, + ] + + _retry( + functools.partial(test_api.test_fxn), + allowed_exceptions={InternalServerError: _check_rst_stream_error}, + ) + + self.assertEqual(test_api.test_fxn.call_count, 3) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index c3ea162f11..07c199f450 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -1603,6 +1603,25 @@ def test_begin_w_other_error(self): attributes=BASE_ATTRIBUTES, ) + def test_begin_w_retry(self): + from google.cloud.spanner_v1 import ( + Transaction as TransactionPB, + ) + from google.api_core.exceptions import InternalServerError + + database = _Database() + api = database.spanner_api = self._make_spanner_api() + database.spanner_api.begin_transaction.side_effect = [ + InternalServerError("Received unexpected EOS on DATA frame from server"), + TransactionPB(id=TXN_ID), + ] + timestamp = self._makeTimestamp() + session = _Session(database) + snapshot = self._make_one(session, read_timestamp=timestamp, multi_use=True) + + snapshot.begin() + self.assertEqual(api.begin_transaction.call_count, 2) + def test_begin_ok_exact_staleness(self): from google.protobuf.duration_pb2 import Duration from google.cloud.spanner_v1 import ( diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 5fb69b4979..5fefd0a4ae 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -188,6 +188,25 @@ def test_begin_ok(self): "CloudSpanner.BeginTransaction", attributes=TestTransaction.BASE_ATTRIBUTES ) + def test_begin_w_retry(self): + from google.cloud.spanner_v1 import ( + Transaction as TransactionPB, + ) + from google.api_core.exceptions import InternalServerError + + database = _Database() + api = database.spanner_api = self._make_spanner_api() + database.spanner_api.begin_transaction.side_effect = [ + InternalServerError("Received unexpected EOS on DATA frame from server"), + TransactionPB(id=self.TRANSACTION_ID), + ] + + session = _Session(database) + transaction = self._make_one(session) + transaction.begin() + + self.assertEqual(api.begin_transaction.call_count, 2) + def test_rollback_not_begun(self): database = _Database() api = database.spanner_api = self._make_spanner_api() From 02cccfe34cdd87d06b3c30671c9216e8317be46e Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Thu, 27 Apr 2023 12:27:50 +0530 Subject: [PATCH 3/6] fix --- tests/unit/spanner_dbapi/test_connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 7a0ac9e687..3b45f9d105 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -169,7 +169,7 @@ def test__session_checkout(self, mock_database): connection._session_checkout() self.assertEqual(connection._session, "db_session") - def test__session_checkout_database_error(self): + def test_session_checkout_database_error(self): from google.cloud.spanner_dbapi import Connection connection = Connection(INSTANCE) @@ -190,7 +190,7 @@ def test__release_session(self, mock_database): pool.put.assert_called_once_with("session") self.assertIsNone(connection._session) - def test__release_session_database_error(self): + def test_release_session_database_error(self): from google.cloud.spanner_dbapi import Connection connection = Connection(INSTANCE) From 3ce6df929a247df5dcd33b7e26c4048ab79b45e1 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Wed, 24 May 2023 11:33:58 +0530 Subject: [PATCH 4/6] rst stream comment changes --- google/cloud/spanner_v1/_helpers.py | 5 ++--- tests/unit/test__helpers.py | 6 ++++- tests/unit/test_snapshot.py | 34 +++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index d4d91131ab..b4ff516f69 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -309,6 +309,7 @@ def _retry( retry_count: The maximum number of times to retry the function. delay: The delay in seconds between retries. allowed_exceptions: A tuple of exceptions that are allowed to occur without triggering a retry. + Passing allowed_exceptions as None will lead to retrying for all exceptions. Returns: The result of the function if it is successful, or raises the last exception if all retries fail. @@ -316,7 +317,7 @@ def _retry( retries = 0 while retries <= retry_count: try: - result = func() + return func() except Exception as exc: if ( allowed_exceptions is None or exc.__class__ in allowed_exceptions @@ -331,8 +332,6 @@ def _retry( retries = retries + 1 else: raise exc - else: - return result def _check_rst_stream_error(exc): diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 6944852977..929721c5ed 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -700,7 +700,7 @@ def test_retry_allowed_exceptions(self): test_api = mock.create_autospec(self.test_class) test_api.test_fxn.side_effect = [ - InternalServerError("testing"), + NotFound("testing"), InternalServerError("testing"), True, ] @@ -711,6 +711,8 @@ def test_retry_allowed_exceptions(self): allowed_exceptions={NotFound: None}, ) + self.assertEqual(test_api.test_fxn.call_count, 2) + def test_retry_count(self): from google.api_core.exceptions import InternalServerError from google.cloud.spanner_v1._helpers import _retry @@ -725,6 +727,8 @@ def test_retry_count(self): with self.assertRaises(InternalServerError): _retry(functools.partial(test_api.test_fxn), retry_count=1) + self.assertEqual(test_api.test_fxn.call_count, 2) + def test_check_rst_stream_error(self): from google.api_core.exceptions import InternalServerError from google.cloud.spanner_v1._helpers import _retry, _check_rst_stream_error diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 07c199f450..d57e4bdf60 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -1152,6 +1152,40 @@ def test_partition_read_other_error(self): ), ) + def test_partition_read_w_retry(self): + from google.cloud.spanner_v1.keyset import KeySet + from google.api_core.exceptions import InternalServerError + from google.cloud.spanner_v1 import Partition + from google.cloud.spanner_v1 import PartitionResponse + from google.cloud.spanner_v1 import Transaction + + keyset = KeySet(all_=True) + database = _Database() + api = database.spanner_api = self._make_spanner_api() + new_txn_id = b"ABECAB91" + token_1 = b"FACE0FFF" + token_2 = b"BADE8CAF" + response = PartitionResponse( + partitions=[ + Partition(partition_token=token_1), + Partition(partition_token=token_2), + ], + transaction=Transaction(id=new_txn_id), + ) + database.spanner_api.partition_read.side_effect = [ + InternalServerError("Received unexpected EOS on DATA frame from server"), + response, + ] + + session = _Session(database) + derived = self._makeDerived(session) + derived._multi_use = True + derived._transaction_id = TXN_ID + + list(derived.partition_read(TABLE_NAME, COLUMNS, keyset)) + + self.assertEqual(api.partition_read.call_count, 2) + def test_partition_read_ok_w_index_no_options(self): self._partition_read_helper(multi_use=True, w_txn=True, index="index") From 3ce19a67a11b6e2d47b8de48cd65af61c593651a Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Wed, 24 May 2023 11:38:34 +0530 Subject: [PATCH 5/6] lint --- google/cloud/spanner_v1/_helpers.py | 1 - tests/unit/test__helpers.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index bf9a91d499..4f708b20cf 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -358,4 +358,3 @@ def _metadata_with_leader_aware_routing(value, **kw): List[Tuple[str, str]]: RPC metadata with leader aware routing header """ return ("x-goog-spanner-route-to-leader", str(value).lower()) - diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 5dfe6ef00b..0e0ec903a2 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -672,7 +672,6 @@ def test(self): self.assertEqual(metadata, [("google-cloud-resource-prefix", prefix)]) - class Test_retry(unittest.TestCase): class test_class: def test_fxn(self): @@ -749,6 +748,7 @@ def test_check_rst_stream_error(self): self.assertEqual(test_api.test_fxn.call_count, 3) + class Test_metadata_with_leader_aware_routing(unittest.TestCase): def _call_fut(self, *args, **kw): from google.cloud.spanner_v1._helpers import _metadata_with_leader_aware_routing From 59f0b038cafb4867e9da09a4c958008595e66875 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Wed, 24 May 2023 12:34:15 +0530 Subject: [PATCH 6/6] lint --- google/cloud/spanner_v1/snapshot.py | 10 +++++----- google/cloud/spanner_v1/transaction.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index ffe0788626..6d17bfc386 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -29,15 +29,15 @@ from google.api_core.exceptions import ServiceUnavailable from google.api_core.exceptions import InvalidArgument from google.api_core import gapic_v1 -from google.cloud.spanner_v1._helpers import _make_value_pb -from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import ( + _make_value_pb, + _merge_query_options, _metadata_with_prefix, _metadata_with_leader_aware_routing, + _retry, + _check_rst_stream_error, + _SessionWrapper, ) -from google.cloud.spanner_v1._helpers import _SessionWrapper -from google.cloud.spanner_v1._helpers import _retry -from google.cloud.spanner_v1._helpers import _check_rst_stream_error from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1.streamed import StreamedResultSet from google.cloud.spanner_v1 import RequestOptions diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index cd65f27b08..dee99a0c6f 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -22,6 +22,8 @@ _merge_query_options, _metadata_with_prefix, _metadata_with_leader_aware_routing, + _retry, + _check_rst_stream_error, ) from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import ExecuteBatchDmlRequest @@ -33,8 +35,6 @@ from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1 import RequestOptions from google.api_core import gapic_v1 -from google.cloud.spanner_v1._helpers import _retry -from google.cloud.spanner_v1._helpers import _check_rst_stream_error from google.api_core.exceptions import InternalServerError