From 204d3146a058bed63d4c8afe27776496ad2d5932 Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Fri, 20 Dec 2024 16:45:57 +0800 Subject: [PATCH 01/11] fix: remove the unused retry index field --- api/controllers/console/app/workflow.py | 4 +- .../advanced_chat/generate_task_pipeline.py | 31 +++++----- .../apps/workflow/generate_task_pipeline.py | 32 +++++----- api/core/app/apps/workflow_app_runner.py | 62 ++++++++++--------- api/core/app/entities/queue_entities.py | 19 +----- .../task_pipeline/workflow_cycle_manage.py | 5 +- api/core/helper/ssrf_proxy.py | 6 +- .../workflow/graph_engine/entities/event.py | 4 +- .../workflow/graph_engine/graph_engine.py | 7 ++- api/core/workflow/nodes/event/event.py | 2 +- .../workflow/nodes/http_request/executor.py | 2 + api/fields/workflow_run_fields.py | 4 ++ ...dd_retry_index_field_to_node_execution_.py | 33 ---------- api/models/workflow.py | 1 - 14 files changed, 88 insertions(+), 124 deletions(-) delete mode 100644 api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index f228c3ec4a0e07..58393a978da928 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -15,7 +15,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from factories import variable_factory from fields.workflow_fields import workflow_fields -from fields.workflow_run_fields import workflow_run_node_execution_fields +from fields.workflow_run_fields import single_step_node_execution_fields from libs import helper from libs.helper import TimestampField, uuid_value from libs.login import current_user, login_required @@ -285,7 +285,7 @@ class DraftWorkflowNodeRunApi(Resource): @login_required @account_initialization_required @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) - @marshal_with(workflow_run_node_execution_fields) + @marshal_with(single_step_node_execution_fields) def post(self, app_model: App, node_id: str): """ Run draft workflow node diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index ce0e95962772ad..8e1731b31439eb 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -291,6 +291,22 @@ def _process_stream_response( yield self._workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run ) + elif isinstance( + event, + QueueNodeRetryEvent, + ): + workflow_node_execution = self._handle_workflow_node_execution_retried( + workflow_run=workflow_run, event=event + ) + + response = self._workflow_node_retry_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if response: + yield response elif isinstance(event, QueueNodeStartedEvent): if not workflow_run: raise Exception("Workflow run not initialized.") @@ -331,22 +347,7 @@ def _process_stream_response( if response: yield response - elif isinstance( - event, - QueueNodeRetryEvent, - ): - workflow_node_execution = self._handle_workflow_node_execution_retried( - workflow_run=workflow_run, event=event - ) - response = self._workflow_node_retry_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if response: - yield response elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: raise Exception("Workflow run not initialized.") diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 79e5e2bcb96d60..b129904efbfd8c 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -254,6 +254,22 @@ def _process_stream_response( yield self._workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run ) + elif isinstance( + event, + QueueNodeRetryEvent, + ): + workflow_node_execution = self._handle_workflow_node_execution_retried( + workflow_run=workflow_run, event=event + ) + + response = self._workflow_node_retry_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if response: + yield response elif isinstance(event, QueueNodeStartedEvent): if not workflow_run: raise Exception("Workflow run not initialized.") @@ -289,22 +305,6 @@ def _process_stream_response( ) if node_failed_response: yield node_failed_response - elif isinstance( - event, - QueueNodeRetryEvent, - ): - workflow_node_execution = self._handle_workflow_node_execution_retried( - workflow_run=workflow_run, event=event - ) - - response = self._workflow_node_retry_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if response: - yield response elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 2fbf711175aab5..bf3509c7a0651c 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -188,6 +188,38 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) ) elif isinstance(event, GraphRunFailedEvent): self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count)) + elif isinstance(event, NodeRunRetryEvent): + self._publish_event( + QueueNodeRetryEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_data=event.node_data, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + parent_parallel_id=event.parent_parallel_id, + parent_parallel_start_node_id=event.parent_parallel_start_node_id, + start_at=event.start_at, + node_run_index=event.node_run_index, + predecessor_node_id=event.predecessor_node_id, + in_iteration_id=event.in_iteration_id, + parallel_mode_run_id=event.parallel_mode_run_id, + inputs=event.route_node_state.node_run_result.inputs + if event.route_node_state.node_run_result + else {}, + process_data=event.route_node_state.node_run_result.process_data + if event.route_node_state.node_run_result + else {}, + outputs=event.route_node_state.node_run_result.outputs + if event.route_node_state.node_run_result + else {}, + error=event.error, + execution_metadata=event.route_node_state.node_run_result.metadata + if event.route_node_state.node_run_result + else {}, + retry_index=event.retry_index, + ) + ) elif isinstance(event, NodeRunStartedEvent): self._publish_event( QueueNodeStartedEvent( @@ -422,36 +454,6 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) error=event.error if isinstance(event, IterationRunFailedEvent) else None, ) ) - elif isinstance(event, NodeRunRetryEvent): - self._publish_event( - QueueNodeRetryEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_data=event.node_data, - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - parent_parallel_id=event.parent_parallel_id, - parent_parallel_start_node_id=event.parent_parallel_start_node_id, - start_at=event.start_at, - inputs=event.route_node_state.node_run_result.inputs - if event.route_node_state.node_run_result - else {}, - process_data=event.route_node_state.node_run_result.process_data - if event.route_node_state.node_run_result - else {}, - outputs=event.route_node_state.node_run_result.outputs - if event.route_node_state.node_run_result - else {}, - error=event.error, - execution_metadata=event.route_node_state.node_run_result.metadata - if event.route_node_state.node_run_result - else {}, - in_iteration_id=event.in_iteration_id, - retry_index=event.retry_index, - start_index=event.start_index, - ) - ) def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: """ diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 49b7e80246a3f2..3c9f05de5bbae9 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -314,27 +314,11 @@ class QueueNodeSucceededEvent(AppQueueEvent): iteration_duration_map: Optional[dict[str, float]] = None -class QueueNodeRetryEvent(AppQueueEvent): +class QueueNodeRetryEvent(QueueNodeStartedEvent): """QueueNodeRetryEvent entity""" event: QueueEvent = QueueEvent.RETRY - node_execution_id: str - node_id: str - node_type: NodeType - node_data: BaseNodeData - parallel_id: Optional[str] = None - """parallel id if node is in parallel""" - parallel_start_node_id: Optional[str] = None - """parallel start node id if node is in parallel""" - parent_parallel_id: Optional[str] = None - """parent parallel id if node is in parallel""" - parent_parallel_start_node_id: Optional[str] = None - """parent parallel start node id if node is in parallel""" - in_iteration_id: Optional[str] = None - """iteration id if node is in iteration""" - start_at: datetime - inputs: Optional[dict[str, Any]] = None process_data: Optional[dict[str, Any]] = None outputs: Optional[dict[str, Any]] = None @@ -342,7 +326,6 @@ class QueueNodeRetryEvent(AppQueueEvent): error: str retry_index: int # retry index - start_index: int # start index class QueueNodeInIterationFailedEvent(AppQueueEvent): diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index e2fa12b1cddd70..951fef1fa10ce1 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -445,6 +445,7 @@ def _handle_workflow_node_execution_retried( workflow_node_execution.workflow_id = workflow_run.workflow_id workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value workflow_node_execution.workflow_run_id = workflow_run.id + workflow_node_execution.predecessor_node_id = event.predecessor_node_id workflow_node_execution.node_execution_id = event.node_execution_id workflow_node_execution.node_id = event.node_id workflow_node_execution.node_type = event.node_type.value @@ -461,9 +462,11 @@ def _handle_workflow_node_execution_retried( workflow_node_execution.execution_metadata = json.dumps( { NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, + NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id, + NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, } ) - workflow_node_execution.index = event.start_index + workflow_node_execution.index = event.node_run_index db.session.add(workflow_node_execution) db.session.commit() diff --git a/api/core/helper/ssrf_proxy.py b/api/core/helper/ssrf_proxy.py index d8aa80536412e6..6153becc2ab2b9 100644 --- a/api/core/helper/ssrf_proxy.py +++ b/api/core/helper/ssrf_proxy.py @@ -63,13 +63,15 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list") except httpx.RequestError as e: + if max_retries == 0: + raise logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}") retries += 1 if retries <= max_retries: time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1))) - - raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") + if max_retries != 0: + raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index 999715316494fb..396b10747fc8f4 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -97,11 +97,11 @@ class NodeInIterationFailedEvent(BaseNodeEvent): error: str = Field(..., description="error") -class NodeRunRetryEvent(BaseNodeEvent): +class NodeRunRetryEvent(NodeRunStartedEvent): error: str = Field(..., description="error") retry_index: int = Field(..., description="which retry attempt is about to be performed") start_at: datetime = Field(..., description="retry start time") - start_index: int = Field(..., description="retry start index") + node_run_index: int = Field(..., description="retry run index") ########################################### diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index e292b099684b7b..2bb74babe70c4a 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -649,14 +649,15 @@ def _run_node( node_type=node_instance.node_type, node_data=node_instance.node_data, route_node_state=route_node_state, - error=run_result.error, - retry_index=retries, + predecessor_node_id=node_instance.previous_node_id, parallel_id=parallel_id, parallel_start_node_id=parallel_start_node_id, parent_parallel_id=parent_parallel_id, parent_parallel_start_node_id=parent_parallel_start_node_id, + error=run_result.error, + retry_index=retries, start_at=retry_start_at, - start_index=self.graph_runtime_state.node_run_steps, + node_run_index=self.graph_runtime_state.node_run_steps, ) time.sleep(retry_interval) continue diff --git a/api/core/workflow/nodes/event/event.py b/api/core/workflow/nodes/event/event.py index 0dc35e7d7740ef..65147bc40686a6 100644 --- a/api/core/workflow/nodes/event/event.py +++ b/api/core/workflow/nodes/event/event.py @@ -46,7 +46,7 @@ class SingleStepRetryEvent(BaseModel): inputs: dict | None = Field(..., description="input") error: str = Field(..., description="error") - outputs: dict = Field(..., description="output") + outputs: dict | None = Field(..., description="output") retry_index: int = Field(..., description="Retry attempt number") error: str = Field(..., description="error") elapsed_time: float = Field(..., description="elapsed time") diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index 92f190091badeb..038197317c7722 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -251,6 +251,8 @@ def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response: response = getattr(ssrf_proxy, self.method)(**request_args) except ssrf_proxy.MaxRetriesExceededError as e: raise HttpRequestNodeError(str(e)) + except httpx.RequestError as e: + raise HttpRequestNodeError(str(e)) return response def invoke(self) -> Response: diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index 7c01ffc2c62e39..99c8dc100479fc 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -112,6 +112,10 @@ "created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True), "created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True), "finished_at": TimestampField, +} + +single_step_node_execution_fields = { + **workflow_run_node_execution_fields, "retry_events": fields.List(fields.Nested(retry_event_field)), } diff --git a/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py b/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py deleted file mode 100644 index 3254c23c96192d..00000000000000 --- a/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py +++ /dev/null @@ -1,33 +0,0 @@ -"""add retry_index field to node-execution model - -Revision ID: e1944c35e15e -Revises: 11b07f66c737 -Create Date: 2024-12-20 06:28:30.287197 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'e1944c35e15e' -down_revision = '11b07f66c737' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op: - batch_op.add_column(sa.Column('retry_index', sa.Integer(), server_default=sa.text('0'), nullable=True)) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op: - batch_op.drop_column('retry_index') - - # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index e933382a841a6f..51a6fbc8c8740e 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -640,7 +640,6 @@ class WorkflowNodeExecution(db.Model): created_by_role = db.Column(db.String(255), nullable=False) created_by = db.Column(StringUUID, nullable=False) finished_at = db.Column(db.DateTime) - retry_index = db.Column(db.Integer, server_default=db.text("0")) @property def created_by_account(self): From 280cc679bc88089f2b18ebd9c1c79d7979e9309e Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Fri, 20 Dec 2024 17:59:15 +0800 Subject: [PATCH 02/11] fix: Add missing information for step-by-step debugging --- api/core/workflow/nodes/event/event.py | 8 +------- api/fields/workflow_run_fields.py | 10 ++++++---- api/services/workflow_service.py | 16 ++++++---------- 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/api/core/workflow/nodes/event/event.py b/api/core/workflow/nodes/event/event.py index 65147bc40686a6..137b47655102af 100644 --- a/api/core/workflow/nodes/event/event.py +++ b/api/core/workflow/nodes/event/event.py @@ -39,15 +39,9 @@ class RunRetryEvent(BaseModel): start_at: datetime = Field(..., description="Retry start time") -class SingleStepRetryEvent(BaseModel): +class SingleStepRetryEvent(NodeRunResult): """Single step retry event""" status: str = WorkflowNodeExecutionStatus.RETRY.value - inputs: dict | None = Field(..., description="input") - error: str = Field(..., description="error") - outputs: dict | None = Field(..., description="output") - retry_index: int = Field(..., description="Retry attempt number") - error: str = Field(..., description="error") elapsed_time: float = Field(..., description="elapsed time") - execution_metadata: dict | None = Field(..., description="execution metadata") diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index 99c8dc100479fc..caca88a9b7ffb0 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -82,13 +82,15 @@ } retry_event_field = { - "error": fields.String, - "retry_index": fields.Integer, - "inputs": fields.Raw(attribute="inputs"), "elapsed_time": fields.Float, - "execution_metadata": fields.Raw(attribute="execution_metadata_dict"), "status": fields.String, + "inputs": fields.Raw(attribute="inputs"), + "process_data": fields.Raw(attribute="process_data"), "outputs": fields.Raw(attribute="outputs"), + "metadata": fields.Raw(attribute="metadata"), + "llm_usage": fields.Raw(attribute="llm_usage"), + "error": fields.String, + "retry_index": fields.Integer, } diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index ead552d6c2e83e..baa69b3d8dc8d7 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -268,18 +268,14 @@ def run_draft_workflow_node( node_run_result.retry_index = retries retry_events.append( SingleStepRetryEvent( - inputs=WorkflowEntry.handle_special_values(node_run_result.inputs) - if node_run_result.inputs - else None, + elapsed_time=time.perf_counter() - retry_start_at, + inputs=WorkflowEntry.handle_special_values(node_run_result.inputs), + process_data=WorkflowEntry.handle_special_values(node_run_result.process_data), + outputs=WorkflowEntry.handle_special_values(node_run_result.outputs), + metadata=node_run_result.metadata, + llm_usage=node_run_result.llm_usage, error=node_run_result.error, - outputs=WorkflowEntry.handle_special_values(node_run_result.outputs) - if node_run_result.outputs - else None, retry_index=node_run_result.retry_index, - elapsed_time=time.perf_counter() - retry_start_at, - execution_metadata=WorkflowEntry.handle_special_values(node_run_result.metadata) - if node_run_result.metadata - else None, ) ) time.sleep(retry_interval) From bb7d8ebadcc3e759aef1acce80e18659070732de Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Fri, 20 Dec 2024 18:42:12 +0800 Subject: [PATCH 03/11] fix: add missing import --- api/models/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/models/model.py b/api/models/model.py index 8608b12af1fc8e..f484acde78e3fc 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -18,7 +18,7 @@ from core.file.tool_file_parser import ToolFileParser from libs.helper import generate_string from models.enums import CreatedByRole -from models.workflow import WorkflowRunStatus +from models.workflow import Workflow, WorkflowRunStatus from .account import Account, Tenant from .engine import db From 3e48f54f4e91372e6aff395af8ebfe00bc961183 Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Mon, 23 Dec 2024 10:27:54 +0800 Subject: [PATCH 04/11] fix: remove the single step retry --- api/controllers/console/app/workflow.py | 4 +- api/core/helper/ssrf_proxy.py | 6 +- api/fields/workflow_run_fields.py | 5 - api/services/workflow_service.py | 137 +++++++++--------------- 4 files changed, 53 insertions(+), 99 deletions(-) diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 58393a978da928..f228c3ec4a0e07 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -15,7 +15,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from factories import variable_factory from fields.workflow_fields import workflow_fields -from fields.workflow_run_fields import single_step_node_execution_fields +from fields.workflow_run_fields import workflow_run_node_execution_fields from libs import helper from libs.helper import TimestampField, uuid_value from libs.login import current_user, login_required @@ -285,7 +285,7 @@ class DraftWorkflowNodeRunApi(Resource): @login_required @account_initialization_required @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) - @marshal_with(single_step_node_execution_fields) + @marshal_with(workflow_run_node_execution_fields) def post(self, app_model: App, node_id: str): """ Run draft workflow node diff --git a/api/core/helper/ssrf_proxy.py b/api/core/helper/ssrf_proxy.py index 6153becc2ab2b9..425b3535c43752 100644 --- a/api/core/helper/ssrf_proxy.py +++ b/api/core/helper/ssrf_proxy.py @@ -45,6 +45,7 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): ) retries = 0 + stream = kwargs.pop("stream", False) while retries <= max_retries: try: if dify_config.SSRF_PROXY_ALL_URL: @@ -63,15 +64,14 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list") except httpx.RequestError as e: + logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}") if max_retries == 0: raise - logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}") retries += 1 if retries <= max_retries: time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1))) - if max_retries != 0: - raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") + raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index caca88a9b7ffb0..74fdf8bd97b23a 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -116,11 +116,6 @@ "finished_at": TimestampField, } -single_step_node_execution_fields = { - **workflow_run_node_execution_fields, - "retry_events": fields.List(fields.Nested(retry_event_field)), -} - workflow_run_node_execution_list_fields = { "data": fields.List(fields.Nested(workflow_run_node_execution_fields)), } diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index baa69b3d8dc8d7..84768d5af053e4 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -15,7 +15,6 @@ from core.workflow.nodes.base.node import BaseNode from core.workflow.nodes.enums import ErrorStrategy from core.workflow.nodes.event import RunCompletedEvent -from core.workflow.nodes.event.event import SingleStepRetryEvent from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.workflow_entry import WorkflowEntry from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated @@ -221,95 +220,56 @@ def run_draft_workflow_node( # run draft workflow node start_at = time.perf_counter() - retries = 0 - max_retries = 0 - should_retry = True - retry_events = [] try: - while retries <= max_retries and should_retry: - retry_start_at = time.perf_counter() - node_instance, generator = WorkflowEntry.single_step_run( - workflow=draft_workflow, - node_id=node_id, - user_inputs=user_inputs, - user_id=account.id, - ) - node_instance = cast(BaseNode[BaseNodeData], node_instance) - max_retries = ( - node_instance.node_data.retry_config.max_retries if node_instance.node_data.retry_config else 0 - ) - retry_interval = node_instance.node_data.retry_config.retry_interval_seconds - node_run_result: NodeRunResult | None = None - for event in generator: - if isinstance(event, RunCompletedEvent): - node_run_result = event.run_result - - # sign output files - node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) - break - - if not node_run_result: - raise ValueError("Node run failed with no run result") - # single step debug mode error handling return - if node_run_result.status == WorkflowNodeExecutionStatus.FAILED: - if ( - retries == max_retries - and node_instance.node_type == NodeType.HTTP_REQUEST - and node_run_result.outputs - and not node_instance.should_continue_on_error - ): - node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED - should_retry = False - else: - if node_instance.should_retry: - node_run_result.status = WorkflowNodeExecutionStatus.RETRY - retries += 1 - node_run_result.retry_index = retries - retry_events.append( - SingleStepRetryEvent( - elapsed_time=time.perf_counter() - retry_start_at, - inputs=WorkflowEntry.handle_special_values(node_run_result.inputs), - process_data=WorkflowEntry.handle_special_values(node_run_result.process_data), - outputs=WorkflowEntry.handle_special_values(node_run_result.outputs), - metadata=node_run_result.metadata, - llm_usage=node_run_result.llm_usage, - error=node_run_result.error, - retry_index=node_run_result.retry_index, - ) - ) - time.sleep(retry_interval) - else: - should_retry = False - if node_instance.should_continue_on_error: - node_error_args = { - "status": WorkflowNodeExecutionStatus.EXCEPTION, - "error": node_run_result.error, - "inputs": node_run_result.inputs, - "metadata": {"error_strategy": node_instance.node_data.error_strategy}, - } - if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: - node_run_result = NodeRunResult( - **node_error_args, - outputs={ - **node_instance.node_data.default_value_dict, - "error_message": node_run_result.error, - "error_type": node_run_result.error_type, - }, - ) - else: - node_run_result = NodeRunResult( - **node_error_args, - outputs={ - "error_message": node_run_result.error, - "error_type": node_run_result.error_type, - }, - ) - run_succeeded = node_run_result.status in ( - WorkflowNodeExecutionStatus.SUCCEEDED, - WorkflowNodeExecutionStatus.EXCEPTION, - ) - error = node_run_result.error if not run_succeeded else None + node_instance, generator = WorkflowEntry.single_step_run( + workflow=draft_workflow, + node_id=node_id, + user_inputs=user_inputs, + user_id=account.id, + ) + node_instance = cast(BaseNode[BaseNodeData], node_instance) + node_run_result: NodeRunResult | None = None + for event in generator: + if isinstance(event, RunCompletedEvent): + node_run_result = event.run_result + + # sign output files + node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) + break + + if not node_run_result: + raise ValueError("Node run failed with no run result") + # single step debug mode error handling return + if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error: + node_error_args = { + "status": WorkflowNodeExecutionStatus.EXCEPTION, + "error": node_run_result.error, + "inputs": node_run_result.inputs, + "metadata": {"error_strategy": node_instance.node_data.error_strategy}, + } + if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: + node_run_result = NodeRunResult( + **node_error_args, + outputs={ + **node_instance.node_data.default_value_dict, + "error_message": node_run_result.error, + "error_type": node_run_result.error_type, + }, + ) + else: + node_run_result = NodeRunResult( + **node_error_args, + outputs={ + "error_message": node_run_result.error, + "error_type": node_run_result.error_type, + }, + ) + run_succeeded = node_run_result.status in ( + WorkflowNodeExecutionStatus.SUCCEEDED, + WorkflowNodeExecutionStatus.EXCEPTION, + ) + error = node_run_result.error if not run_succeeded else None except WorkflowNodeRunFailedError as e: node_instance = e.node_instance run_succeeded = False @@ -358,7 +318,6 @@ def run_draft_workflow_node( db.session.add(workflow_node_execution) db.session.commit() - workflow_node_execution.retry_events = retry_events return workflow_node_execution From ed09d39ec9e091d1b1f3f036cf7f760f69731cdf Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Mon, 23 Dec 2024 10:45:40 +0800 Subject: [PATCH 05/11] fix: ruff check error --- api/core/workflow/nodes/http_request/executor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index 038197317c7722..3b7e19331993ee 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -249,9 +249,7 @@ def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response: # request_args = {k: v for k, v in request_args.items() if v is not None} try: response = getattr(ssrf_proxy, self.method)(**request_args) - except ssrf_proxy.MaxRetriesExceededError as e: - raise HttpRequestNodeError(str(e)) - except httpx.RequestError as e: + except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e: raise HttpRequestNodeError(str(e)) return response From 081bd20def719e1621d62f3e533e164685f0e303 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 23 Dec 2024 11:49:06 +0800 Subject: [PATCH 06/11] fix: update type hinting for Workflow import in model.py Signed-off-by: -LAN- --- api/models/model.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/api/models/model.py b/api/models/model.py index ebf0c16c560b69..1417298c79c0a2 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -4,7 +4,7 @@ from collections.abc import Mapping from datetime import datetime from enum import Enum, StrEnum -from typing import Any, Literal, Optional +from typing import TYPE_CHECKING, Any, Literal, Optional import sqlalchemy as sa from flask import request @@ -18,12 +18,15 @@ from core.file.tool_file_parser import ToolFileParser from libs.helper import generate_string from models.enums import CreatedByRole -from models.workflow import Workflow, WorkflowRunStatus +from models.workflow import WorkflowRunStatus from .account import Account, Tenant from .engine import db from .types import StringUUID +if TYPE_CHECKING: + from .workflow import Workflow + class DifySetup(db.Model): __tablename__ = "dify_setups" From 8b8801d43c2a83c72c1a3f4a8c0dc8baed6aa2b5 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 23 Dec 2024 11:58:36 +0800 Subject: [PATCH 07/11] feat: add migration to add and remove retry_index field in workflow_node_executions Signed-off-by: -LAN- --- ...dd_retry_index_field_to_node_execution_.py | 37 +++++++++++++++++++ ..._remove_workflow_node_executions_retry_.py | 34 +++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py create mode 100644 api/migrations/versions/2024_12_23_1154-d7999dfa4aae_remove_workflow_node_executions_retry_.py diff --git a/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py b/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py new file mode 100644 index 00000000000000..814dec423c63c4 --- /dev/null +++ b/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py @@ -0,0 +1,37 @@ +"""add retry_index field to node-execution model +Revision ID: e1944c35e15e +Revises: 11b07f66c737 +Create Date: 2024-12-20 06:28:30.287197 +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'e1944c35e15e' +down_revision = '11b07f66c737' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + # We don't need these fields anymore, but this file is already merged into the main branch, + # so we need to keep this file for the sake of history, and this change will be reverted in the next migration. + # with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op: + # batch_op.add_column(sa.Column('retry_index', sa.Integer(), server_default=sa.text('0'), nullable=True)) + + pass + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + # with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op: + # batch_op.drop_column('retry_index') + pass + + # ### end Alembic commands ### \ No newline at end of file diff --git a/api/migrations/versions/2024_12_23_1154-d7999dfa4aae_remove_workflow_node_executions_retry_.py b/api/migrations/versions/2024_12_23_1154-d7999dfa4aae_remove_workflow_node_executions_retry_.py new file mode 100644 index 00000000000000..ea129d15f7e6e6 --- /dev/null +++ b/api/migrations/versions/2024_12_23_1154-d7999dfa4aae_remove_workflow_node_executions_retry_.py @@ -0,0 +1,34 @@ +"""remove workflow_node_executions.retry_index if exists + +Revision ID: d7999dfa4aae +Revises: e1944c35e15e +Create Date: 2024-12-23 11:54:15.344543 + +""" +from alembic import op +import models as models +import sqlalchemy as sa +from sqlalchemy import inspect + + +# revision identifiers, used by Alembic. +revision = 'd7999dfa4aae' +down_revision = 'e1944c35e15e' +branch_labels = None +depends_on = None + + +def upgrade(): + # Check if column exists before attempting to remove it + conn = op.get_bind() + inspector = inspect(conn) + has_column = 'retry_index' in [col['name'] for col in inspector.get_columns('workflow_node_executions')] + + if has_column: + with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op: + batch_op.drop_column('retry_index') + + +def downgrade(): + # No downgrade needed as we don't want to restore the column + pass From d4ddcda3f2bed763ae8acc73231e50c79e621900 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 23 Dec 2024 12:06:56 +0800 Subject: [PATCH 08/11] refactor: streamline input handling and update type hints for event data structures Signed-off-by: -LAN- --- api/core/app/apps/workflow_app_runner.py | 54 ++++++++++--------- api/core/app/entities/queue_entities.py | 49 ++++++++--------- .../workflow/graph_engine/entities/event.py | 2 +- 3 files changed, 56 insertions(+), 49 deletions(-) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index bf3509c7a0651c..fb71e0b1e3e06f 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -189,6 +189,17 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) elif isinstance(event, GraphRunFailedEvent): self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count)) elif isinstance(event, NodeRunRetryEvent): + node_run_result = event.route_node_state.node_run_result + if node_run_result: + inputs = node_run_result.inputs + process_data = node_run_result.process_data + outputs = node_run_result.outputs + execution_metadata = node_run_result.metadata + else: + inputs = {} + process_data = {} + outputs = {} + execution_metadata = {} self._publish_event( QueueNodeRetryEvent( node_execution_id=event.id, @@ -204,19 +215,11 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) predecessor_node_id=event.predecessor_node_id, in_iteration_id=event.in_iteration_id, parallel_mode_run_id=event.parallel_mode_run_id, - inputs=event.route_node_state.node_run_result.inputs - if event.route_node_state.node_run_result - else {}, - process_data=event.route_node_state.node_run_result.process_data - if event.route_node_state.node_run_result - else {}, - outputs=event.route_node_state.node_run_result.outputs - if event.route_node_state.node_run_result - else {}, + inputs=inputs, + process_data=process_data, + outputs=outputs, error=event.error, - execution_metadata=event.route_node_state.node_run_result.metadata - if event.route_node_state.node_run_result - else {}, + execution_metadata=execution_metadata, retry_index=event.retry_index, ) ) @@ -239,6 +242,17 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) ) ) elif isinstance(event, NodeRunSucceededEvent): + node_run_result = event.route_node_state.node_run_result + if node_run_result: + inputs = node_run_result.inputs + process_data = node_run_result.process_data + outputs = node_run_result.outputs + execution_metadata = node_run_result.metadata + else: + inputs = {} + process_data = {} + outputs = {} + execution_metadata = {} self._publish_event( QueueNodeSucceededEvent( node_execution_id=event.id, @@ -250,18 +264,10 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) parent_parallel_id=event.parent_parallel_id, parent_parallel_start_node_id=event.parent_parallel_start_node_id, start_at=event.route_node_state.start_at, - inputs=event.route_node_state.node_run_result.inputs - if event.route_node_state.node_run_result - else {}, - process_data=event.route_node_state.node_run_result.process_data - if event.route_node_state.node_run_result - else {}, - outputs=event.route_node_state.node_run_result.outputs - if event.route_node_state.node_run_result - else {}, - execution_metadata=event.route_node_state.node_run_result.metadata - if event.route_node_state.node_run_result - else {}, + inputs=inputs, + process_data=process_data, + outputs=outputs, + execution_metadata=execution_metadata, in_iteration_id=event.in_iteration_id, ) ) diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 3c9f05de5bbae9..d73c2eb53bfcd7 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -1,3 +1,4 @@ +from collections.abc import Mapping from datetime import datetime from enum import Enum, StrEnum from typing import Any, Optional @@ -85,9 +86,9 @@ class QueueIterationStartEvent(AppQueueEvent): start_at: datetime node_run_index: int - inputs: Optional[dict[str, Any]] = None + inputs: Optional[Mapping[str, Any]] = None predecessor_node_id: Optional[str] = None - metadata: Optional[dict[str, Any]] = None + metadata: Optional[Mapping[str, Any]] = None class QueueIterationNextEvent(AppQueueEvent): @@ -139,9 +140,9 @@ class QueueIterationCompletedEvent(AppQueueEvent): start_at: datetime node_run_index: int - inputs: Optional[dict[str, Any]] = None - outputs: Optional[dict[str, Any]] = None - metadata: Optional[dict[str, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None + metadata: Optional[Mapping[str, Any]] = None steps: int = 0 error: Optional[str] = None @@ -304,9 +305,9 @@ class QueueNodeSucceededEvent(AppQueueEvent): """iteration id if node is in iteration""" start_at: datetime - inputs: Optional[dict[str, Any]] = None - process_data: Optional[dict[str, Any]] = None - outputs: Optional[dict[str, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + process_data: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None error: Optional[str] = None @@ -319,10 +320,10 @@ class QueueNodeRetryEvent(QueueNodeStartedEvent): event: QueueEvent = QueueEvent.RETRY - inputs: Optional[dict[str, Any]] = None - process_data: Optional[dict[str, Any]] = None - outputs: Optional[dict[str, Any]] = None - execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + process_data: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None + execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None error: str retry_index: int # retry index @@ -351,10 +352,10 @@ class QueueNodeInIterationFailedEvent(AppQueueEvent): """iteration id if node is in iteration""" start_at: datetime - inputs: Optional[dict[str, Any]] = None - process_data: Optional[dict[str, Any]] = None - outputs: Optional[dict[str, Any]] = None - execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + process_data: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None + execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None error: str @@ -382,10 +383,10 @@ class QueueNodeExceptionEvent(AppQueueEvent): """iteration id if node is in iteration""" start_at: datetime - inputs: Optional[dict[str, Any]] = None - process_data: Optional[dict[str, Any]] = None - outputs: Optional[dict[str, Any]] = None - execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + process_data: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None + execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None error: str @@ -413,10 +414,10 @@ class QueueNodeFailedEvent(AppQueueEvent): """iteration id if node is in iteration""" start_at: datetime - inputs: Optional[dict[str, Any]] = None - process_data: Optional[dict[str, Any]] = None - outputs: Optional[dict[str, Any]] = None - execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None + inputs: Optional[Mapping[str, Any]] = None + process_data: Optional[Mapping[str, Any]] = None + outputs: Optional[Mapping[str, Any]] = None + execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None error: str diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index 396b10747fc8f4..6a65403d2cbd16 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -33,7 +33,7 @@ class GraphRunSucceededEvent(BaseGraphEvent): class GraphRunFailedEvent(BaseGraphEvent): error: str = Field(..., description="failed reason") - exceptions_count: Optional[int] = Field(description="exception count", default=0) + exceptions_count: int = Field(description="exception count", default=0) class GraphRunPartialSucceededEvent(BaseGraphEvent): From cdc854d15be6c51ebdee33dc697d6c896e87afad Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 23 Dec 2024 12:11:30 +0800 Subject: [PATCH 09/11] fix: replace Exception with ValueError for uninitialized workflow run and graph runtime state Signed-off-by: -LAN- --- .../apps/workflow/generate_task_pipeline.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index fd84908975ff66..bd63c4ec7a88f6 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -218,7 +218,7 @@ def _wrapper_process_stream_response( break else: yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id) - except Exception as e: + except Exception: logger.exception(f"Fails to get audio trunk, task_id: {task_id}") break if tts_publisher: @@ -258,6 +258,8 @@ def _process_stream_response( event, QueueNodeRetryEvent, ): + if not workflow_run: + raise ValueError("workflow run not initialized.") workflow_node_execution = self._handle_workflow_node_execution_retried( workflow_run=workflow_run, event=event ) @@ -272,7 +274,7 @@ def _process_stream_response( yield response elif isinstance(event, QueueNodeStartedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event) @@ -308,45 +310,45 @@ def _process_stream_response( elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_parallel_branch_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationStartEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationNextEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationCompletedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueWorkflowSucceededEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_success( workflow_run=workflow_run, @@ -366,10 +368,10 @@ def _process_stream_response( ) elif isinstance(event, QueueWorkflowPartialSuccessEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_partial_success( workflow_run=workflow_run, @@ -390,10 +392,10 @@ def _process_stream_response( ) elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_failed( workflow_run=workflow_run, start_at=graph_runtime_state.start_at, From fc15b95b81ef79ddb9128b23d90d12483777abe6 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 23 Dec 2024 12:14:28 +0800 Subject: [PATCH 10/11] fix: replace Exception with ValueError for uninitialized workflow run and queue listening errors Signed-off-by: -LAN- --- .../advanced_chat/generate_task_pipeline.py | 30 ++++++++++--------- .../apps/workflow/generate_task_pipeline.py | 2 +- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index c7bf37dd085ec5..635e482ad980ed 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -180,7 +180,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None] else: continue - raise Exception("Queue listening stopped unexpectedly.") + raise ValueError("queue listening stopped unexpectedly.") def _to_stream_response( self, generator: Generator[StreamResponse, None, None] @@ -295,6 +295,8 @@ def _process_stream_response( event, QueueNodeRetryEvent, ): + if not workflow_run: + raise ValueError("workflow run not initialized.") workflow_node_execution = self._handle_workflow_node_execution_retried( workflow_run=workflow_run, event=event ) @@ -309,7 +311,7 @@ def _process_stream_response( yield response elif isinstance(event, QueueNodeStartedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event) @@ -350,45 +352,45 @@ def _process_stream_response( elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_parallel_branch_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationStartEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationNextEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationCompletedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueWorkflowSucceededEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("workflow run not initialized.") workflow_run = self._handle_workflow_run_success( workflow_run=workflow_run, @@ -407,10 +409,10 @@ def _process_stream_response( self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) elif isinstance(event, QueueWorkflowPartialSuccessEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_partial_success( workflow_run=workflow_run, @@ -430,10 +432,10 @@ def _process_stream_response( self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) elif isinstance(event, QueueWorkflowFailedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_failed( workflow_run=workflow_run, @@ -523,7 +525,7 @@ def _process_stream_response( yield self._message_replace_to_stream_response(answer=event.text) elif isinstance(event, QueueAdvancedChatMessageEndEvent): if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer) if output_moderation_answer: diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index bd63c4ec7a88f6..c47b38f5600f4d 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -155,7 +155,7 @@ def _to_blocking_response(self, generator: Generator[StreamResponse, None, None] else: continue - raise Exception("Queue listening stopped unexpectedly.") + raise ValueError("queue listening stopped unexpectedly.") def _to_stream_response( self, generator: Generator[StreamResponse, None, None] From 6802329055ed7a766c8735fcba92b5d8f94dd251 Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Mon, 23 Dec 2024 13:56:07 +0800 Subject: [PATCH 11/11] fix: log order wrong --- api/core/app/apps/workflow_app_runner.py | 2 +- api/core/workflow/graph_engine/entities/event.py | 1 - api/core/workflow/graph_engine/graph_engine.py | 2 -- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index fb71e0b1e3e06f..885283504b4175 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -211,7 +211,7 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) parent_parallel_id=event.parent_parallel_id, parent_parallel_start_node_id=event.parent_parallel_start_node_id, start_at=event.start_at, - node_run_index=event.node_run_index, + node_run_index=event.route_node_state.index, predecessor_node_id=event.predecessor_node_id, in_iteration_id=event.in_iteration_id, parallel_mode_run_id=event.parallel_mode_run_id, diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index 6a65403d2cbd16..d591b68e7e72be 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -101,7 +101,6 @@ class NodeRunRetryEvent(NodeRunStartedEvent): error: str = Field(..., description="error") retry_index: int = Field(..., description="which retry attempt is about to be performed") start_at: datetime = Field(..., description="retry start time") - node_run_index: int = Field(..., description="retry run index") ########################################### diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 2bb74babe70c4a..d7d33c65fcdb38 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -641,7 +641,6 @@ def _run_node( run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED if node_instance.should_retry and retries < max_retries: retries += 1 - self.graph_runtime_state.node_run_steps += 1 route_node_state.node_run_result = run_result yield NodeRunRetryEvent( id=node_instance.id, @@ -657,7 +656,6 @@ def _run_node( error=run_result.error, retry_index=retries, start_at=retry_start_at, - node_run_index=self.graph_runtime_state.node_run_steps, ) time.sleep(retry_interval) continue