Skip to content

Commit

Permalink
[xy] Retry failed db calls in scheduler. (mage-ai#1808)
Browse files Browse the repository at this point in the history
* [xy] Retry failed db calls in scheduler.

* [xy] Fix import error.
  • Loading branch information
wangxiaoyou1993 authored Jan 24, 2023
1 parent 324c3dd commit 178bee1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
3 changes: 2 additions & 1 deletion mage_ai/orchestration/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import sqlalchemy

DB_RETRY_COUNT = 2
TEST_DB = 'test.db'

db_connection_url = os.getenv(DATABASE_CONNECTION_URL_ENV_VAR)
Expand Down Expand Up @@ -72,7 +73,7 @@ def func_with_rollback(*args, **kwargs):
return func(*args, **kwargs)
except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.PendingRollbackError) as e:
db_connection.session.rollback()
if retry_count >= 1:
if retry_count >= DB_RETRY_COUNT:
raise e
retry_count += 1
return func_with_rollback
7 changes: 0 additions & 7 deletions mage_ai/orchestration/db/constants.py

This file was deleted.

23 changes: 18 additions & 5 deletions mage_ai/orchestration/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def current_execution_date(self) -> datetime:
cron_itr = croniter(self.schedule_interval, now)
return cron_itr.get_prev(datetime)

@safe_db_query
def should_schedule(self) -> bool:
if self.status != self.__class__.ScheduleStatus.ACTIVE:
return False
Expand Down Expand Up @@ -331,6 +332,21 @@ def create(self, create_block_runs: bool = True, **kwargs) -> 'PipelineRun':

return pipeline_run

@classmethod
@safe_db_query
def in_progress_runs(
self,
pipeline_schedules: List[int],
):
return self.query.filter(
PipelineRun.pipeline_schedule_id.in_(pipeline_schedules),
PipelineRun.status.in_([
self.PipelineRunStatus.INITIAL,
self.PipelineRunStatus.RUNNING,
]),
PipelineRun.passed_sla.is_(False),
).all()

def create_block_run(self, block_uuid: str, **kwargs) -> 'BlockRun':
return BlockRun.create(
block_uuid=block_uuid,
Expand Down Expand Up @@ -392,15 +408,12 @@ async def logs_async(self):
).get_logs_async()

@classmethod
@safe_db_query
def batch_update_status(self, block_run_ids: List[int], status):
BlockRun.query.filter(BlockRun.id.in_(block_run_ids)).update({
BlockRun.status: status
}, synchronize_session=False)
try:
db_connection.session.commit()
except Exception as e:
db_connection.rollback()
raise e
db_connection.session.commit()

@classmethod
def get(self, pipeline_run_id: int = None, block_uuid: str = None) -> 'BlockRun':
Expand Down
8 changes: 1 addition & 7 deletions mage_ai/orchestration/pipeline_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from mage_ai.data_preparation.models.pipelines.integration_pipeline import IntegrationPipeline
from mage_ai.data_preparation.repo_manager import get_repo_config, get_repo_path
from mage_ai.data_preparation.variable_manager import get_global_variables
from mage_ai.orchestration.db.constants import IN_PROGRESS_STATUSES
from mage_ai.orchestration.db.models import BlockRun, EventMatcher, PipelineRun, PipelineSchedule
from mage_ai.orchestration.db.process import create_process
from mage_ai.orchestration.execution_process_manager import execution_process_manager
Expand Down Expand Up @@ -665,12 +664,7 @@ def check_sla():
for s in PipelineSchedule.active_schedules(pipeline_uuids=repo_pipelines)
])

pipeline_runs = \
PipelineRun.query.filter(
PipelineRun.pipeline_schedule_id.in_(pipeline_schedules),
PipelineRun.status.in_(IN_PROGRESS_STATUSES),
PipelineRun.passed_sla.is_(False),
).all()
pipeline_runs = PipelineRun.in_progress_runs(pipeline_schedules)

if pipeline_runs:
notification_sender = NotificationSender(
Expand Down

0 comments on commit 178bee1

Please sign in to comment.