Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove the unused retry index field #11903

Merged
merged 12 commits into from
Dec 23, 2024
Prev Previous commit
Next Next commit
fix: replace Exception with ValueError for uninitialized workflow run…
… and graph runtime state

Signed-off-by: -LAN- <laipz8200@outlook.com>
  • Loading branch information
laipz8200 committed Dec 23, 2024
commit cdc854d15be6c51ebdee33dc697d6c896e87afad
28 changes: 15 additions & 13 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def _wrapper_process_stream_response(
break
else:
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
except Exception as e:
except Exception:
logger.exception(f"Fails to get audio trunk, task_id: {task_id}")
break
if tts_publisher:
Expand Down Expand Up @@ -258,6 +258,8 @@ def _process_stream_response(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)
Expand All @@ -272,7 +274,7 @@ def _process_stream_response(
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)

Expand Down Expand Up @@ -308,45 +310,45 @@ def _process_stream_response(

elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationStartEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationNextEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run,
Expand All @@ -366,10 +368,10 @@ def _process_stream_response(
)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_partial_success(
workflow_run=workflow_run,
Expand All @@ -390,10 +392,10 @@ def _process_stream_response(
)
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")
workflow_run = self._handle_workflow_run_failed(
workflow_run=workflow_run,
start_at=graph_runtime_state.start_at,
Expand Down
Loading