Skip to content

Commit

Permalink
Integrate StreamManager with run_sweep() (#6233)
Browse files Browse the repository at this point in the history
* Integrate StreamManager with run() methods

* Added tests and improved docstring

* Deferred run_batch and run_calibration stream migration

* Fix test failures
  • Loading branch information
verult authored Aug 8, 2023
1 parent 5c36dc0 commit 86479ae
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 160 deletions.
36 changes: 26 additions & 10 deletions cirq-google/cirq_google/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async def run_sweep_async(
job_description: Optional[str] = None,
job_labels: Optional[Dict[str, str]] = None,
) -> engine_job.EngineJob:
"""Runs the supplied Circuit via Quantum Engine.Creates
"""Runs the supplied Circuit via Quantum Engine.
In contrast to run, this runs across multiple parameter sweeps, and
does not block until a result is returned.
Expand Down Expand Up @@ -312,20 +312,35 @@ async def run_sweep_async(
Raises:
ValueError: If no gate set is provided.
"""
engine_program = await self.create_program_async(
program, program_id, description=program_description, labels=program_labels
)
return await engine_program.run_sweep_async(
job_id=job_id,
params=params,
repetitions=repetitions,
if not program_id:
program_id = _make_random_id('prog-')
if not job_id:
job_id = _make_random_id('job-')
run_context = self.context._serialize_run_context(params, repetitions)

stream_job_response_future = self.context.client.run_job_over_stream(
project_id=self.project_id,
program_id=str(program_id),
program_description=program_description,
program_labels=program_labels,
code=self.context._serialize_program(program),
job_id=str(job_id),
processor_ids=processor_ids,
description=job_description,
labels=job_labels,
run_context=run_context,
job_description=job_description,
job_labels=job_labels,
)
return engine_job.EngineJob(
self.project_id,
str(program_id),
str(job_id),
self.context,
stream_job_response_future=stream_job_response_future,
)

run_sweep = duet.sync(run_sweep_async)

# TODO(#5996) Migrate to stream client
async def run_batch_async(
self,
programs: Sequence[cirq.AbstractCircuit],
Expand Down Expand Up @@ -406,6 +421,7 @@ async def run_batch_async(

run_batch = duet.sync(run_batch_async)

# TODO(#5996) Migrate to stream client
async def run_calibration_async(
self,
layers: List['cirq_google.CalibrationLayer'],
Expand Down
78 changes: 78 additions & 0 deletions cirq-google/cirq_google/engine/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from cirq._compat import cached_property
from cirq_google.cloud import quantum
from cirq_google.engine.asyncio_executor import AsyncioExecutor
from cirq_google.engine import stream_manager

_M = TypeVar('_M', bound=proto.Message)
_R = TypeVar('_R')
Expand Down Expand Up @@ -105,6 +106,10 @@ async def make_client():

return self._executor.submit(make_client).result()

@cached_property
def _stream_manager(self) -> stream_manager.StreamManager:
return stream_manager.StreamManager(self.grpc_client)

async def _send_request_async(self, func: Callable[[_M], Awaitable[_R]], request: _M) -> _R:
"""Sends a request by invoking an asyncio callable."""
return await self._run_retry_async(func, request)
Expand Down Expand Up @@ -697,6 +702,79 @@ async def get_job_results_async(

get_job_results = duet.sync(get_job_results_async)

def run_job_over_stream(
self,
project_id: str,
program_id: str,
code: any_pb2.Any,
job_id: str,
processor_ids: Sequence[str],
run_context: any_pb2.Any,
program_description: Optional[str] = None,
program_labels: Optional[Dict[str, str]] = None,
priority: Optional[int] = None,
job_description: Optional[str] = None,
job_labels: Optional[Dict[str, str]] = None,
) -> duet.AwaitableFuture[Union[quantum.QuantumResult, quantum.QuantumJob]]:
"""Runs a job with the given program and job information over a stream.
Sends the request over the Quantum Engine QuantumRunStream bidirectional stream, and returns
a future for the stream response. The future will be completed with a `QuantumResult` if
the job is successful; otherwise, it will be completed with a QuantumJob.
Args:
project_id: A project_id of the parent Google Cloud Project.
program_id: Unique ID of the program within the parent project.
code: Properly serialized program code.
job_id: Unique ID of the job within the parent program.
run_context: Properly serialized run context.
processor_ids: List of processor id for running the program.
program_description: An optional description to set on the program.
program_labels: Optional set of labels to set on the program.
priority: Optional priority to run at, 0-1000.
job_description: Optional description to set on the job.
job_labels: Optional set of labels to set on the job.
Returns:
A future for the job result, or the job if the job has failed.
Raises:
ValueError: If the priority is not between 0 and 1000.
"""
# Check program to run and program parameters.
if priority and not 0 <= priority < 1000:
raise ValueError('priority must be between 0 and 1000')

project_name = _project_name(project_id)

program_name = _program_name_from_ids(project_id, program_id)
program = quantum.QuantumProgram(name=program_name, code=code)
if program_description:
program.description = program_description
if program_labels:
program.labels.update(program_labels)

job = quantum.QuantumJob(
name=_job_name_from_ids(project_id, program_id, job_id),
scheduling_config=quantum.SchedulingConfig(
processor_selector=quantum.SchedulingConfig.ProcessorSelector(
processor_names=[
_processor_name_from_ids(project_id, processor_id)
for processor_id in processor_ids
]
)
),
run_context=run_context,
)
if priority:
job.scheduling_config.priority = priority
if job_description:
job.description = job_description
if job_labels:
job.labels.update(job_labels)

return self._stream_manager.submit(project_name, program, job)

async def list_processors_async(self, project_id: str) -> List[quantum.QuantumProcessor]:
"""Returns a list of Processors that the user has visibility to in the
current Engine project. The names of these processors are used to
Expand Down
Loading

0 comments on commit 86479ae

Please sign in to comment.