Skip to content

Commit

Permalink
Fix oppia#2063: convert continuous jobs into one-off jobs. (oppia#2320)
Browse files Browse the repository at this point in the history
* Change continuous recommendations job into a one-off cron job.

* Change continuous search ranker job to a one-off job.

* Fix lint issues.

* Decrease queue rate.
  • Loading branch information
seanlip authored and wxyxinyu committed Aug 8, 2016
1 parent 69886eb commit 9aa6f61
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 163 deletions.
23 changes: 23 additions & 0 deletions core/controllers/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

from core import jobs
from core.controllers import base
from core.domain import exp_jobs_one_off
from core.domain import recommendations_jobs_one_off
from core.domain import user_jobs_one_off
from core.platform import models
import utils
Expand Down Expand Up @@ -97,6 +99,27 @@ def get(self):
user_jobs_one_off.DashboardStatsOneOffJob.create_new())


class CronExplorationRecommendationsHandler(base.BaseHandler):
"""Handler for appending dashboard stats to a list."""

@require_cron_or_superadmin
def get(self):
"""Handles GET requests."""
job_class = (
recommendations_jobs_one_off.ExplorationRecommendationsOneOffJob)
job_class.enqueue(job_class.create_new())


class CronExplorationSearchRankHandler(base.BaseHandler):
"""Handler for computing exploration search ranks."""

@require_cron_or_superadmin
def get(self):
"""Handles GET requests."""
exp_jobs_one_off.IndexAllExplorationsJobManager.enqueue(
exp_jobs_one_off.IndexAllExplorationsJobManager.create_new())


class CronMapreduceCleanupHandler(base.BaseHandler):

def get(self):
Expand Down
74 changes: 0 additions & 74 deletions core/domain/exp_jobs_continuous.py

This file was deleted.

2 changes: 1 addition & 1 deletion core/domain/exp_jobs_one_off.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def reduce(exp_id, stringified_commit_times_msecs):


class IndexAllExplorationsJobManager(jobs.BaseMapReduceJobManager):
"""One-off job that indexes all explorations"""
"""One-off job that indexes all explorations and computes their ranks."""

@classmethod
def entity_classes_to_map_over(cls):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Jobs for recommendations."""
"""One-off jobs for recommendations."""

import ast

Expand All @@ -32,46 +32,10 @@
SIMILARITY_SCORE_THRESHOLD = 3.0


class ExplorationRecommendationsRealtimeModel(
jobs.BaseRealtimeDatastoreClassForContinuousComputations):
pass


class ExplorationRecommendationsAggregator(
jobs.BaseContinuousComputationManager):
"""A continuous-computation job that computes recommendations for each
exploration.
This job does not have a realtime component. There will be a delay in
propagating new updates to recommendations; the length of the delay
will be approximately the time it takes a batch job to run.
class ExplorationRecommendationsOneOffJob(jobs.BaseMapReduceJobManager):
"""A one-off job that computes a list of recommended explorations to play
after completing an exploration.
"""
@classmethod
def get_event_types_listened_to(cls):
return []

@classmethod
def _get_realtime_datastore_class(cls):
return ExplorationRecommendationsRealtimeModel

@classmethod
def _get_batch_job_manager_class(cls):
return ExplorationRecommendationsMRJobManager

@classmethod
def _handle_incoming_event(cls, active_realtime_layer, event_type, *args):
pass


class ExplorationRecommendationsMRJobManager(
jobs.BaseMapReduceJobManagerForContinuousComputations):
"""Manager for a MapReduce job that computes a list of recommended
explorations to play after completing some exploration.
"""
@classmethod
def _get_continuous_computation_class(cls):
return ExplorationRecommendationsAggregator

@classmethod
def entity_classes_to_map_over(cls):
return [exp_models.ExpSummaryModel]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for recommendations_jobs_continuous."""
"""Tests for recommendations_jobs_one_off."""

from core import jobs_registry
from core.domain import recommendations_jobs_continuous
from core.domain import recommendations_jobs_one_off
from core.domain import recommendations_services
from core.domain import recommendations_services_test
from core.domain import rights_manager
Expand All @@ -27,35 +27,17 @@
taskqueue_services = models.Registry.import_taskqueue_services()


class ModifiedExplorationRecommendationsAggregator(
recommendations_jobs_continuous.ExplorationRecommendationsAggregator):
"""A modified ExplorationRecommendationsAggregator that does not start a
new batch job when the previous one has finished.
"""

@classmethod
def _get_batch_job_manager_class(cls):
return ModifiedExplorationRecommendationsMRJobManager

@classmethod
def _kickoff_batch_job_after_previous_one_ends(cls):
pass


class ModifiedExplorationRecommendationsMRJobManager(
recommendations_jobs_continuous.ExplorationRecommendationsMRJobManager):

@classmethod
def _get_continuous_computation_class(cls):
return ModifiedExplorationRecommendationsAggregator


class ExplorationRecommendationsAggregatorUnitTests(
class ExplorationRecommendationsOneOffJobUnitTests(
recommendations_services_test.RecommendationsServicesUnitTests):
"""Test recommendations services."""
"""Test exploration recommendations one-off job."""

ALL_CC_MANAGERS_FOR_TESTS = [
ModifiedExplorationRecommendationsAggregator]
ONE_OFF_JOB_MANAGERS_FOR_TESTS = [
recommendations_jobs_one_off.ExplorationRecommendationsOneOffJob]

def setUp(self):
super(ExplorationRecommendationsOneOffJobUnitTests, self).setUp()
self.job_class = (
recommendations_jobs_one_off.ExplorationRecommendationsOneOffJob)

def test_basic_computation(self):
recommendations_services.update_topic_similarities(
Expand All @@ -65,10 +47,10 @@ def test_basic_computation(self):
'0.1,0.8,1.0')

with self.swap(
jobs_registry, 'ALL_CONTINUOUS_COMPUTATION_MANAGERS',
self.ALL_CC_MANAGERS_FOR_TESTS
jobs_registry, 'ONE_OFF_JOB_MANAGERS',
self.ONE_OFF_JOB_MANAGERS_FOR_TESTS
):
ModifiedExplorationRecommendationsAggregator.start_computation()
self.job_class.enqueue(self.job_class.create_new())
self.assertEqual(
self.count_jobs_in_taskqueue(
queue_name=taskqueue_services.QUEUE_NAME_DEFAULT),
Expand All @@ -88,10 +70,10 @@ def test_basic_computation(self):

def test_recommendations_after_changes_in_rights(self):
with self.swap(
jobs_registry, 'ALL_CONTINUOUS_COMPUTATION_MANAGERS',
self.ALL_CC_MANAGERS_FOR_TESTS
jobs_registry, 'ONE_OFF_JOB_MANAGERS',
self.ONE_OFF_JOB_MANAGERS_FOR_TESTS
):
ModifiedExplorationRecommendationsAggregator.start_computation()
self.job_class.enqueue(self.job_class.create_new())
self.assertEqual(
self.count_jobs_in_taskqueue(
queue_name=taskqueue_services.QUEUE_NAME_DEFAULT), 1)
Expand All @@ -104,9 +86,8 @@ def test_recommendations_after_changes_in_rights(self):
recommendations, ['exp_id_4', 'exp_id_2', 'exp_id_3'])

rights_manager.unpublish_exploration(self.admin_id, 'exp_id_4')
ModifiedExplorationRecommendationsAggregator.stop_computation(
self.admin_id)
ModifiedExplorationRecommendationsAggregator.start_computation()

self.job_class.enqueue(self.job_class.create_new())
self.assertEqual(
self.count_jobs_in_taskqueue(
queue_name=taskqueue_services.QUEUE_NAME_DEFAULT), 1)
Expand Down
6 changes: 1 addition & 5 deletions core/jobs_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

"""Job registries."""

from core.domain import exp_jobs_continuous
from core.domain import exp_jobs_one_off
from core.domain import feedback_jobs_continuous
from core.domain import recommendations_jobs_continuous
from core.domain import stats_jobs_continuous
from core.domain import stats_jobs_one_off
from core.domain import user_jobs_continuous
Expand Down Expand Up @@ -48,12 +46,10 @@
# NOTE TO DEVELOPERS: When a new ContinuousComputation manager is defined,
# it should be registered here.
ALL_CONTINUOUS_COMPUTATION_MANAGERS = [
exp_jobs_continuous.SearchRanker,
stats_jobs_continuous.StatisticsAggregator,
user_jobs_continuous.DashboardRecentUpdatesAggregator,
user_jobs_continuous.UserStatsAggregator,
feedback_jobs_continuous.FeedbackAnalyticsAggregator,
recommendations_jobs_continuous.ExplorationRecommendationsAggregator]
feedback_jobs_continuous.FeedbackAnalyticsAggregator]


class ContinuousComputationEventDispatcher(object):
Expand Down
12 changes: 9 additions & 3 deletions cron.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ cron:
- description: daily email about mapreduce job statuses
url: /cron/mail/admin/job_status
schedule: every day 16:00
- description: weekly copy of a user's dashboard statistics
url: /cron/users/dashboard_stats
schedule: every monday 09:00
- description: clean up old mapreduce jobs
url: /cron/jobs/cleanup
schedule: every day 08:00
- description: weekly copy of a user's dashboard statistics
url: /cron/users/dashboard_stats
schedule: every monday 09:00
- description: weekly exploration recommendations computation
url: /cron/explorations/recommendations
schedule: every tuesday 9:00
- description: weekly exploration search rank computation
url: /cron/explorations/search_rank
schedule: every wednesday 9:00
8 changes: 8 additions & 0 deletions main_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
main.get_redirect_route(
r'/cron/users/dashboard_stats', cron.CronDashboardStatsHandler,
'job_creator_dashboard_stats'),
main.get_redirect_route(
r'/cron/explorations/recommendations',
cron.CronExplorationRecommendationsHandler,
'exploration_recommendations_handler'),
main.get_redirect_route(
r'/cron/explorations/search_rank',
cron.CronExplorationSearchRankHandler,
'exploration_search_rank_handler'),
main.get_redirect_route(
r'/cron/jobs/cleanup', cron.CronMapreduceCleanupHandler,
'job_cleanup_handler'),
Expand Down
4 changes: 2 additions & 2 deletions queue.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
total_storage_limit: 120M
queue:
- name: default
rate: 3/m
rate: 2/m
- name: events
rate: 5/s
- name: backups
rate: 5/s
- name: emails
rate: 5/s
rate: 5/s

0 comments on commit 9aa6f61

Please sign in to comment.