Skip to content

Commit

Permalink
Add a cron job to clean up old auxiliary model instances for MR jobs.
Browse files Browse the repository at this point in the history
  • Loading branch information
seanlip committed Sep 8, 2014
1 parent f751906 commit 7b2d568
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 1 deletion.
106 changes: 106 additions & 0 deletions core/controllers/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,22 @@

"""Controllers for the cron jobs."""

import logging

from core import jobs
from core.controllers import base
from core.platform import models
email_services = models.Registry.import_email_services()
(job_models,) = models.Registry.import_models([models.NAMES.job])
import feconf
import utils

from mapreduce import main as mapreduce_main
from mapreduce import model as mapreduce_model
from mapreduce.lib.pipeline import pipeline

# The default retention timeline is 2 days.
MAX_MAPREDUCE_METADATA_RETENTION_MSECS = 2 * 24 * 60 * 60 * 1000


def require_cron(handler):
Expand Down Expand Up @@ -74,3 +85,98 @@ def get(self):

email_services.send_mail_to_admin(
feconf.ADMIN_EMAIL_ADDRESS, email_subject, email_message)


class CronMapreduceCleanupHandler(base.BaseHandler):

def get(self):
"""Clean up intermediate data items for completed or failed M/R jobs.
Map/reduce runs leave around a large number of rows in several
tables. This data is useful to have around for a while:
- it helps diagnose any problems with jobs that may be occurring
- it shows where resource usage is occurring
However, after a few days, this information is less relevant, and
should be cleaned up.
"""
self._clean_mapreduce(MAX_MAPREDUCE_METADATA_RETENTION_MSECS)

@classmethod
def _clean_mapreduce(cls, recency_msec):
"""Cleans up all MR jobs that started more than recency_msec
milliseconds ago.
"""
num_cleaned = 0

min_age_msec = recency_msec
# Only consider jobs that started at most 1 week before recency_msec.
max_age_msec = recency_msec + 7 * 24 * 60 * 60 * 1000
# The latest start time that a job scheduled for cleanup may have.
max_start_time_msec = utils.get_current_time_in_millisecs() - min_age_msec

# Get all pipeline ids from jobs that started between max_age_msecs
# and max_age_msecs + 1 week, before now.
pipeline_id_to_job_instance = {}

job_instances = job_models.JobModel.get_recent_jobs(1000, max_age_msec)
for job_instance in job_instances:
if (job_instance.time_started_msec < max_start_time_msec and not
job_instance.has_been_cleaned_up):
if 'root_pipeline_id' in job_instance.metadata:
pipeline_id = job_instance.metadata['root_pipeline_id']
pipeline_id_to_job_instance[pipeline_id] = job_instance

# Clean up pipelines.
for pline in pipeline.get_root_list()['pipelines']:
pipeline_id = pline['pipelineId']
job_definitely_terminated = (
pline['status'] == 'done' or
pline['status'] == 'aborted' or
pline['currentAttempt'] > pline['maxAttempts'])
have_start_time = 'startTimeMs' in pline
job_started_too_long_ago = (
have_start_time and
pline['startTimeMs'] < max_start_time_msec)

if (job_started_too_long_ago or
(not have_start_time and job_definitely_terminated)):
# At this point, the map/reduce pipeline is either in a
# terminal state, or has taken so long that there's no
# realistic possibility that there might be a race condition
# between this and the job actually completing.
if pipeline_id in pipeline_id_to_job_instance:
job_instance = pipeline_id_to_job_instance[pipeline_id]
job_instance.has_been_cleaned_up = True
job_instance.put()

# This enqueues a deferred cleanup item.
p = pipeline.Pipeline.from_id(pipeline_id)
if p:
p.cleanup()
num_cleaned += 1

mapreduce_state_model_entities_deleted = 0
shard_state_model_entities_deleted = 0

mapreduce_state_model_class = mapreduce_model.MapreduceState
shard_state_model_class = mapreduce_model.ShardState

for entity in mapreduce_state_model_class.all():
if (entity.result_status == 'success' and
utils.get_time_in_millisecs(entity.start_time) <
max_start_time_msec):
entity.delete()
mapreduce_state_model_entities_deleted += 1

for entity in shard_state_model_class.all():
if (entity.result_status == 'success' and
utils.get_time_in_millisecs(entity.update_time) <
max_start_time_msec):
entity.delete()
shard_state_model_entities_deleted += 1

logging.warning(
'%s MR jobs cleaned up, %s MR state entities deleted, '
'%s shard state entities deleted.' % (
num_cleaned, mapreduce_state_model_entities_deleted,
shard_state_model_entities_deleted))
2 changes: 1 addition & 1 deletion core/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class MapReduceJobPipeline(base_handler.PipelineBase):

def run(self, job_id, job_class_str, kwargs):
job_class = mapreduce_util.for_name(job_class_str)
job_class.register_start(job_id, {
job_class.register_start(job_id, metadata={
job_class._OUTPUT_KEY_ROOT_PIPELINE_ID: self.root_pipeline_id
})

Expand Down
3 changes: 3 additions & 0 deletions core/storage/job/gae_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ def get_new_id(cls, entity_name):
# The error message, if applicable. Only populated if the job has status
# code STATUS_CODE_FAILED or STATUS_CODE_CANCELED; None otherwise.
error = ndb.TextProperty(indexed=False)
# Whether the datastore models associated with this job have been cleaned
# up (i.e., deleted).
has_been_cleaned_up = ndb.BooleanProperty(default=False, indexed=True)

@property
def is_cancelable(self):
Expand Down
3 changes: 3 additions & 0 deletions cron.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ cron:
- description: daily email about mapreduce job statuses
url: /cron/mail/admin/job_status
schedule: every day 16:00
- description: clean up old mapreduce jobs
url: /cron/jobs/cleanup
schedule: every day 08:00
3 changes: 3 additions & 0 deletions main_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
main.get_redirect_route(
r'/cron/mail/admin/job_status', cron.JobStatusMailerHandler,
'job_failure_mailer'),
main.get_redirect_route(
r'/cron/jobs/cleanup', cron.CronMapreduceCleanupHandler,
'job_cleanup_handler'),
]


Expand Down

0 comments on commit 7b2d568

Please sign in to comment.