Skip to content

Commit

Permalink
StreamManager: retry with get result request on already exist errors (#…
Browse files Browse the repository at this point in the history
…6345)

* StreamManager: retry with get result request on already exist errors

* Fix unused import

* Clarified comment for 'program already exists' retry
  • Loading branch information
verult authored Nov 15, 2023
1 parent 0e288a7 commit 392083b
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 22 deletions.
30 changes: 12 additions & 18 deletions cirq-google/cirq_google/engine/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@
]


class ProgramAlreadyExistsError(Exception):
def __init__(self, program_name: str):
# Call the base class constructor with the parameters it needs
super().__init__(f"'{program_name}' already exists")


class StreamError(Exception):
pass

Expand Down Expand Up @@ -150,7 +144,6 @@ def submit(
A future for the job result, or the job if the job has failed.
Raises:
ProgramAlreadyExistsError: if the program already exists.
StreamError: if there is a non-retryable error while executing the job.
ValueError: if program name is not set.
concurrent.futures.CancelledError: if the stream is stopped while a job is in flight.
Expand Down Expand Up @@ -298,6 +291,7 @@ async def _manage_execution(
current_request,
create_program_and_job_request,
_to_create_job_request(create_program_and_job_request),
_to_get_result_request(create_program_and_job_request),
)
continue
else: # pragma: no cover
Expand All @@ -319,7 +313,8 @@ def _get_retry_request_or_raise(
error: quantum.StreamError,
current_request,
create_program_and_job_request,
create_job_request: quantum.QuantumRunStreamRequest,
create_job_request,
get_result_request: quantum.QuantumRunStreamRequest,
):
"""Decide whether the given stream error is retryable.
Expand All @@ -330,19 +325,18 @@ def _get_retry_request_or_raise(
return create_program_and_job_request
elif error.code == Code.PROGRAM_ALREADY_EXISTS:
if 'create_quantum_program_and_job' in current_request:
raise ProgramAlreadyExistsError(
current_request.create_quantum_program_and_job.quantum_program.name
)
# If the program already exists and is created as part of the stream client, the job
# should also exist because they are created at the same time.
# If the job is missing, the program is created outside StreamManager.
# A `CreateQuantumJobRequest` will be issued after a `GetQuantumResultRequest` is
# attempted.
return get_result_request
elif error.code == Code.JOB_DOES_NOT_EXIST:
if 'get_quantum_result' in current_request:
return create_job_request

# Code.JOB_ALREADY_EXISTS should never happen.
# The first stream request is always a CreateQuantumProgramAndJobRequest, which never fails
# with this error because jobs are scoped within a program.
# CreateQuantumJobRequests would fail with a PROGRAM_ALREADY_EXISTS if the job already
# exists because program and job creation happen atomically for a
# CreateQuantumProgramAndJobRequest.
elif error.code == Code.JOB_ALREADY_EXISTS:
if not 'get_quantum_result' in current_request:
return get_result_request

raise StreamError(error.message)

Expand Down
105 changes: 101 additions & 4 deletions cirq-google/cirq_google/engine/stream_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from cirq_google.engine.asyncio_executor import AsyncioExecutor
from cirq_google.engine.stream_manager import (
_get_retry_request_or_raise,
ProgramAlreadyExistsError,
ResponseDemux,
StreamError,
StreamManager,
Expand Down Expand Up @@ -524,9 +523,40 @@ async def test():
duet.run(test)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
def test_submit_program_already_exists_expects_program_already_exists_error(
def test_submit_program_already_exists_expects_get_result_request(self, client_constructor):
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
fake_client, manager = setup(client_constructor)

async def test():
async with duet.timeout_scope(5):
actual_result_future = manager.submit(
REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0
)
await fake_client.wait_for_requests()
await fake_client.reply(
quantum.QuantumRunStreamResponse(
error=quantum.StreamError(
code=quantum.StreamError.Code.PROGRAM_ALREADY_EXISTS
)
)
)
await fake_client.wait_for_requests()
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
actual_result = await actual_result_future
manager.stop()

assert actual_result == expected_result
assert len(fake_client.all_stream_requests) == 2
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
assert 'get_quantum_result' in fake_client.all_stream_requests[1]

duet.run(test)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
def test_submit_program_already_exists_but_job_does_not_exist_expects_create_job_request(
self, client_constructor
):
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
fake_client, manager = setup(client_constructor)

async def test():
Expand All @@ -542,10 +572,75 @@ async def test():
)
)
)
with pytest.raises(ProgramAlreadyExistsError):
await actual_result_future
await fake_client.wait_for_requests()
await fake_client.reply(
quantum.QuantumRunStreamResponse(
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST)
)
)
await fake_client.wait_for_requests()
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
actual_result = await actual_result_future
manager.stop()

assert actual_result == expected_result
assert len(fake_client.all_stream_requests) == 3
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
assert 'get_quantum_result' in fake_client.all_stream_requests[1]
assert 'create_quantum_job' in fake_client.all_stream_requests[2]

duet.run(test)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
def test_submit_job_already_exist_expects_get_result_request(self, client_constructor):
"""Verifies the behavior when the client receives a JOB_ALREADY_EXISTS error.
This error is only expected to be triggered in the following race condition:
1. The client sends a CreateQuantumProgramAndJobRequest.
2. The client's stream disconnects.
3. The client retries with a new stream and a GetQuantumResultRequest.
4. The job doesn't exist yet, and the client receives a "job not found" error.
5. Scheduler creates the program and job.
6. The client retries with a CreateJobRequest and fails with a "job already exists" error.
The JOB_ALREADY_EXISTS error from `CreateQuantumJobRequest` is only possible if the job
doesn't exist yet at the last `GetQuantumResultRequest`.
"""
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
fake_client, manager = setup(client_constructor)

async def test():
async with duet.timeout_scope(5):
actual_result_future = manager.submit(
REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0
)
await fake_client.wait_for_requests()
await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable'))
await fake_client.wait_for_requests()
# Trigger a retry with `CreateQuantumJobRequest`.
await fake_client.reply(
quantum.QuantumRunStreamResponse(
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST)
)
)
await fake_client.wait_for_requests()
await fake_client.reply(
quantum.QuantumRunStreamResponse(
error=quantum.StreamError(code=quantum.StreamError.Code.JOB_ALREADY_EXISTS)
)
)
await fake_client.wait_for_requests()
await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
actual_result = await actual_result_future
manager.stop()

assert actual_result == expected_result
assert len(fake_client.all_stream_requests) == 4
assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
assert 'get_quantum_result' in fake_client.all_stream_requests[1]
assert 'create_quantum_job' in fake_client.all_stream_requests[2]
assert 'get_quantum_result' in fake_client.all_stream_requests[3]

duet.run(test)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
Expand Down Expand Up @@ -690,6 +785,7 @@ async def test():
(Code.PROGRAM_ALREADY_EXISTS, 'get_quantum_result'),
(Code.JOB_DOES_NOT_EXIST, 'create_quantum_program_and_job'),
(Code.JOB_DOES_NOT_EXIST, 'create_quantum_job'),
(Code.JOB_ALREADY_EXISTS, 'get_quantum_result'),
],
)
def test_get_retry_request_or_raise_expects_stream_error(
Expand Down Expand Up @@ -720,6 +816,7 @@ def test_get_retry_request_or_raise_expects_stream_error(
current_request,
create_quantum_program_and_job_request,
create_quantum_job_request,
get_quantum_result_request,
)

@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
Expand Down

0 comments on commit 392083b

Please sign in to comment.