Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove the unused retry index field #11903

Merged
merged 12 commits into from
Dec 23, 2024
Prev Previous commit
Next Next commit
fix: replace Exception with ValueError for uninitialized workflow run…
… and queue listening errors

Signed-off-by: -LAN- <laipz8200@outlook.com>
  • Loading branch information
laipz8200 committed Dec 23, 2024
commit fc15b95b81ef79ddb9128b23d90d12483777abe6
30 changes: 16 additions & 14 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]
else:
continue

raise Exception("Queue listening stopped unexpectedly.")
raise ValueError("queue listening stopped unexpectedly.")

def _to_stream_response(
self, generator: Generator[StreamResponse, None, None]
Expand Down Expand Up @@ -295,6 +295,8 @@ def _process_stream_response(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)
Expand All @@ -309,7 +311,7 @@ def _process_stream_response(
yield response
elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)

Expand Down Expand Up @@ -350,45 +352,45 @@ def _process_stream_response(

elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationStartEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationNextEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("workflow run not initialized.")

workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run,
Expand All @@ -407,10 +409,10 @@ def _process_stream_response(
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_partial_success(
workflow_run=workflow_run,
Expand All @@ -430,10 +432,10 @@ def _process_stream_response(
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowFailedEvent):
if not workflow_run:
raise Exception("Workflow run not initialized.")
raise ValueError("workflow run not initialized.")

if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

workflow_run = self._handle_workflow_run_failed(
workflow_run=workflow_run,
Expand Down Expand Up @@ -523,7 +525,7 @@ def _process_stream_response(
yield self._message_replace_to_stream_response(answer=event.text)
elif isinstance(event, QueueAdvancedChatMessageEndEvent):
if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.")
raise ValueError("graph runtime state not initialized.")

output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer)
if output_moderation_answer:
Expand Down
2 changes: 1 addition & 1 deletion api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]
else:
continue

raise Exception("Queue listening stopped unexpectedly.")
raise ValueError("queue listening stopped unexpectedly.")

def _to_stream_response(
self, generator: Generator[StreamResponse, None, None]
Expand Down
Loading