Skip to content

Commit

Permalink
Migrate the existing StatsPageJobManager to use the new continuous co…
Browse files Browse the repository at this point in the history
…mputations framework.
  • Loading branch information
seanlip committed Jul 30, 2014
1 parent 540fce2 commit 7eed7a4
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 141 deletions.
14 changes: 1 addition & 13 deletions core/controllers/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,4 @@

"""Controllers for the cron jobs."""

from core.controllers import base
from core import jobs_registry


class StatisticsHandler(base.BaseHandler):
"""Handler for statistics cron job."""

def get(self):
"""Handles GET requests."""
for klass in jobs_registry.JOB_MANAGER_CLASSES:
if klass.__name__ == 'StatisticsPageJobManager':
klass.enqueue(klass.create_new())
break
pass
2 changes: 1 addition & 1 deletion core/controllers/editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def get(self, exploration_id):
raise self.PageNotFoundException

self.render_json({
'num_visits': stats_services.get_exploration_visit_count(
'num_starts': stats_services.get_exploration_start_count(
exploration_id),
'num_completions': stats_services.get_exploration_completed_count(
exploration_id),
Expand Down
11 changes: 5 additions & 6 deletions core/controllers/editor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def test_state_stats_for_default_exploration(self):
self.login('editor@example.com')

editor_exploration_dict = self.get_json(EXPLORATION_STATISTICS_URL)
self.assertEqual(editor_exploration_dict['num_visits'], 0)
self.assertEqual(editor_exploration_dict['num_starts'], 0)
self.assertEqual(editor_exploration_dict['num_completions'], 0)

# Switch to the reader perspective. First submit the first
Expand Down Expand Up @@ -335,15 +335,14 @@ def test_state_stats_for_default_exploration(self):
# Now switch back to the editor perspective.
self.login('editor@example.com')

# Trigger a stats update
job_id = (
stats_jobs.StatisticsPageJobManager.create_new())
stats_jobs.StatisticsPageJobManager.enqueue(job_id)
# Trigger a single round of stats updating.
stats_jobs.StatisticsAggregator.start_computation()
stats_jobs.StatisticsAggregator.stop_computation()
self.assertEqual(self.count_jobs_in_taskqueue(), 1)
self.process_and_flush_pending_tasks()

editor_exploration_dict = self.get_json(EXPLORATION_STATISTICS_URL)
self.assertEqual(editor_exploration_dict['num_visits'], 1)
self.assertEqual(editor_exploration_dict['num_starts'], 1)
self.assertEqual(editor_exploration_dict['num_completions'], 0)

# TODO(sll): Add more checks here.
Expand Down
22 changes: 8 additions & 14 deletions core/domain/event_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@

import inspect

from core import jobs
from core import jobs_registry
from core.domain import exp_domain
from core.platform import models
(stats_models,) = models.Registry.import_models([models.NAMES.statistics])
taskqueue_services = models.Registry.import_taskqueue_services()


EVENT_TYPE_STATE_HIT = 'state_hit'
EVENT_TYPE_ANSWER_SUBMITTED = 'answer_submitted'
EVENT_TYPE_DEFAULT_ANSWER_RESOLVED = 'default_answer_resolved'
EVENT_TYPE_START_EXPLORATION = 'start_exploration'
EVENT_TYPE_MAYBE_LEAVE_EXPLORATION = 'maybe_leave_exploration'
import feconf


class BaseEventHandler(object):
Expand All @@ -47,7 +41,7 @@ def _notify_continuous_computation_listeners_async(cls, *args, **kwargs):
layers that are listening for them.
"""
taskqueue_services.defer(
jobs.ContinuousComputationEventDispatcher.dispatch_event,
jobs_registry.ContinuousComputationEventDispatcher.dispatch_event,
cls.EVENT_TYPE, *args, **kwargs)

@classmethod
Expand All @@ -71,7 +65,7 @@ def record(cls, *args, **kwargs):
class StateHitEventHandler(BaseEventHandler):
"""Event handler for recording state hits."""

EVENT_TYPE = EVENT_TYPE_STATE_HIT
EVENT_TYPE = feconf.EVENT_TYPE_STATE_HIT

@classmethod
def _handle_event(cls, exploration_id, state_name, first_time):
Expand All @@ -83,7 +77,7 @@ def _handle_event(cls, exploration_id, state_name, first_time):
class AnswerSubmissionEventHandler(BaseEventHandler):
"""Event handler for recording answer submissions."""

EVENT_TYPE = EVENT_TYPE_ANSWER_SUBMITTED
EVENT_TYPE = feconf.EVENT_TYPE_ANSWER_SUBMITTED

@classmethod
def _notify_continuous_computation_listeners_async(cls, *args, **kwargs):
Expand All @@ -105,7 +99,7 @@ class DefaultRuleAnswerResolutionEventHandler(BaseEventHandler):
"""Event handler for recording resolving of answers triggering the default
rule."""

EVENT_TYPE = EVENT_TYPE_DEFAULT_ANSWER_RESOLVED
EVENT_TYPE = feconf.EVENT_TYPE_DEFAULT_ANSWER_RESOLVED

@classmethod
def _handle_event(cls, exploration_id, state_name, handler_name, answers):
Expand All @@ -119,7 +113,7 @@ def _handle_event(cls, exploration_id, state_name, handler_name, answers):
class StartExplorationEventHandler(BaseEventHandler):
"""Event handler for recording exploration start events."""

EVENT_TYPE = EVENT_TYPE_START_EXPLORATION
EVENT_TYPE = feconf.EVENT_TYPE_START_EXPLORATION

@classmethod
def _handle_event(cls, exp_id, exp_version, state_name, session_id,
Expand All @@ -132,7 +126,7 @@ def _handle_event(cls, exp_id, exp_version, state_name, session_id,
class MaybeLeaveExplorationEventHandler(BaseEventHandler):
"""Event handler for recording exploration leave events."""

EVENT_TYPE = EVENT_TYPE_MAYBE_LEAVE_EXPLORATION
EVENT_TYPE = feconf.EVENT_TYPE_MAYBE_LEAVE_EXPLORATION

@classmethod
def _handle_event(
Expand Down
127 changes: 112 additions & 15 deletions core/domain/stats_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,117 @@
"""Jobs for statistics views."""

import ast
from datetime import datetime
import datetime

from core import jobs
from core.domain import exp_services
from core.platform import models
(stats_models,) = models.Registry.import_models([models.NAMES.statistics])
(base_models, stats_models,) = models.Registry.import_models([
models.NAMES.base_model, models.NAMES.statistics])
transaction_services = models.Registry.import_transaction_services()
import feconf
import logging
import utils

from google.appengine.ext import ndb

class StatisticsPageJobManager(jobs.BaseMapReduceJobManager):


class StatisticsRealtimeModel0(base_models.BaseModel):
num_starts = ndb.IntegerProperty(default=0)
num_completions = ndb.IntegerProperty(default=0)


class StatisticsRealtimeModel1(base_models.BaseModel):
num_starts = ndb.IntegerProperty(default=0)
num_completions = ndb.IntegerProperty(default=0)


class StatisticsAggregator(jobs.BaseContinuousComputationManager):
"""A continuous-computation job that counts 'start exploration' and
'complete exploration' events.
"""
@classmethod
def get_event_types_listened_to(cls):
return [
feconf.EVENT_TYPE_START_EXPLORATION,
feconf.EVENT_TYPE_MAYBE_LEAVE_EXPLORATION]

@classmethod
def _get_realtime_datastore_classes(cls):
return [
StatisticsRealtimeModel0, StatisticsRealtimeModel1]

@classmethod
def _get_batch_job_manager_class(cls):
return StatisticsMRJobManager

@classmethod
def _handle_incoming_event(cls, datastore_class, event_type, *args):
exp_id = args[0]

def _increment_visit_counter():
model = datastore_class.get(exp_id, strict=False)
if model is None:
datastore_class(id=exp_id, num_starts=1).put()
else:
model.count += 1
model.put()

def _increment_completion_counter():
model = datastore_class.get(exp_id, strict=False)
if model is None:
datastore_class(id=exp_id, num_completions=1).put()
else:
model.count += 1
model.put()

if event_type == feconf.EVENT_TYPE_START_EXPLORATION:
transaction_services.run_in_transaction(
_increment_visit_counter)
else:
transaction_services.run_in_transaction(
_increment_completion_counter)

# Public query method.
@classmethod
def get_statistics(cls, exploration_id):
"""Returns a dict with two keys: 'start_exploration_count' and
'complete_exploration_count'. The corresponding values are the
number of times the given exploration was started and completed,
respectively.
"""
mr_model = stats_models.ExplorationAnnotationsModel.get(
exploration_id, strict=False)
realtime_model = cls._get_active_realtime_datastore_class().get(
exploration_id, strict=False)

num_starts = 0
if mr_model is not None:
num_starts += mr_model.num_starts
if realtime_model is not None:
num_starts += realtime_model.num_starts

num_completions = 0
if mr_model is not None:
num_completions += mr_model.num_completions
if realtime_model is not None:
num_completions += realtime_model.num_completions

return {
'start_exploration_count': num_starts,
'complete_exploration_count': num_completions,
}


class StatisticsMRJobManager(
jobs.BaseMapReduceJobManagerForContinuousComputations):
"""Job that calculates and creates stats models for exploration view.
Includes: * number of visits to the exploration
* number of completions of the exploration
"""
@classmethod
def _get_continuous_computation_class(cls):
return StatisticsAggregator

@classmethod
def entity_classes_to_map_over(cls):
Expand All @@ -39,26 +134,27 @@ def entity_classes_to_map_over(cls):

@staticmethod
def map(item):
map_value = {'event_type': item.event_type,
'session_id': item.session_id,
'created_on': int(utils.get_time_in_millisecs(item.created_on)),
'state_name': item.state_name}
yield (item.exploration_id, map_value)
if StatisticsMRJobManager._entity_created_before_job_queued(item):
yield (item.exploration_id, {
'event_type': item.event_type,
'session_id': item.session_id,
'state_name': item.state_name})

@staticmethod
def reduce(key, stringified_values):
started_count = 0
complete_count = 0
for value_str in stringified_values:
value = ast.literal_eval(value_str)
if value['event_type'] == feconf.EVENT_TYPE_START:
if value['event_type'] == feconf.EVENT_TYPE_START_EXPLORATION:
started_count += 1
elif value['event_type'] == feconf.EVENT_TYPE_LEAVE:
elif (value['event_type'] ==
feconf.EVENT_TYPE_MAYBE_LEAVE_EXPLORATION):
if value['state_name'] == feconf.END_DEST:
complete_count += 1
stats_models.ExplorationAnnotationsModel(
id=key,
num_visits=started_count,
num_starts=started_count,
num_completions=complete_count).put()


Expand Down Expand Up @@ -145,12 +241,12 @@ def reduce(key, stringified_values):
for _ in range(missing_events_count):
version = exp_services.get_exploration_by_id(
value['exp_id']).version
created_on = datetime.fromtimestamp(
created_on = datetime.datetime.fromtimestamp(
value['created_on'] / 1000)
if state_name != feconf.END_DEST:
start_event_entity = (
stats_models.StartExplorationEventLogEntryModel(
event_type=feconf.EVENT_TYPE_START,
event_type=feconf.EVENT_TYPE_START_EXPLORATION,
exploration_id=value['exp_id'],
exploration_version=version,
state_name=value['state_name'],
Expand All @@ -164,7 +260,8 @@ def reduce(key, stringified_values):
else:
leave_event_entity = (
stats_models.MaybeLeaveExplorationEventLogEntryModel(
event_type=feconf.EVENT_TYPE_LEAVE,
event_type=(
feconf.EVENT_TYPE_MAYBE_LEAVE_EXPLORATION),
exploration_id=value['exp_id'],
exploration_version=version,
state_name=state_name,
Expand Down
Loading

0 comments on commit 7eed7a4

Please sign in to comment.