Skip to content

Commit

Permalink
Starting to integrate the EventDispatcher
Browse files Browse the repository at this point in the history
Adding a few events to the main Worker class.  Each event will be able
to cancel a Job / Worker from running.
  • Loading branch information
justinrainbow committed Jul 22, 2012
1 parent e2a0f10 commit 33988d6
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
vendor
phpunit.xml
19 changes: 9 additions & 10 deletions src/Presque/AbstractJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,27 @@ public function getStatus()
return $this->status;
}

/**
* {@inheritDoc}
*/
public function isSuccessful()
{
return StatusInterface::SUCCESS === $this->getStatus();
}

/**
* {@inheritDoc}
*/
public function isError()
{
return StatusInterface::FAILED === $this->getStatus();
}

/**
* {@inheritDoc}
*/
public function isActive()
{
return StatusInterface::RUNNING === $this->getStatus();
}

public function isTrackable()
{

}

public function getMaxAttempts()
{

}
}
62 changes: 62 additions & 0 deletions src/Presque/Event/JobEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

/*
* This file is part of the Presque package.
*
* (c) Justin Rainbow <justin.rainbow@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Presque\Event;

use Presque\JobInterface;
use Presque\QueueInterface;
use Presque\WorkerInterface;

class JobEvent extends Event
{
private $worker;
private $canceled;

public function __construct(JobInterface $job, QueueInterface $queue, WorkerInterface $worker)
{
$this->job = $job;
$this->queue = $queue;
$this->worker = $worker;
$this->canceled = false;
}

public function getJob()
{
return $this->job;
}

public function setJob(JobInterface $job)
{
$this->job = $job;

return $this;
}

public function getQueue()
{
return $this->queue;
}

public function getWorker()
{
return $this->worker;
}

public function cancel()
{
$this->canceled = true;
}

public function isCanceled()
{
return $this->canceled;
}
}
19 changes: 19 additions & 0 deletions src/Presque/Event/WorkerEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,31 @@

namespace Presque\Event;

use Presque\WorkerInterface;

class WorkerEvent extends Event
{
private $worker;
private $canceled;

public function __construct(WorkerInterface $worker)
{
$this->worker = $worker;
$this->canceled = false;
}

public function getWorker()
{
return $this->worker;
}

public function cancel()
{
$this->canceled = true;
}

public function isCanceled()
{
return $this->canceled;
}
}
2 changes: 2 additions & 0 deletions src/Presque/Events.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ final class Events
const WORK_PAUSED = 'presque.work.paused';

const WORK_STOPPED = 'presque.work.stopped';

const JOB_STARTED = 'presque.job.started';
}
73 changes: 48 additions & 25 deletions src/Presque/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@ class Job extends AbstractJob
private $reflMethod;
private $instance;

/**
* {@inheritDoc}
*/
public static function create($class, array $args = array())
{
return new static($class, $args);
}

/**
* @param string $class Class that will be performing
* @param array $args List of arguments to pass to the `$class->perform()` method
*
* @throws InvalidArgumentException If `$class` does not implement the required methods
*/
public function __construct($class, array $args = array())
{
try {
Expand All @@ -50,8 +59,15 @@ public function __construct($class, array $args = array())
$method = $refl->getMethod('perform');

if (!$method->isPublic()) {
$visibility = 'unknown';
if ($method->isPrivate()) {
$visibility = 'private';
} elseif ($method->isProtected()) {
$visibility = 'protected';
}

throw new InvalidArgumentException(
'The "perform" method for class "' . $class . '" must be public, not ' . $method->getVisiblity()
'The "perform" method for class "' . $class . '" must be public, not ' . $visibility
);
}

Expand All @@ -69,6 +85,37 @@ public function __construct($class, array $args = array())
$this->reflMethod = $method;
}

/**
* {@inheritDoc}
*/
public function getClass()
{
return $this->class;
}

/**
* {@inheritDoc}
*/
public function getArguments()
{
return $this->args;
}

/**
* {@inheritDoc}
*/
public function getInstance()
{
if (!$this->instance) {
$this->instance = $this->reflClass->newInstance();
}

return $this->instance;
}

/**
* {@inheritDoc}
*/
public function perform()
{
$this->lastResult = $this->lastError = null;
Expand All @@ -89,28 +136,4 @@ public function perform()

return $this;
}

public function getMaxAttempts()
{

}

public function getClass()
{
return $this->class;
}

public function getArguments()
{
return $this->args;
}

public function getInstance()
{
if (!$this->instance) {
$this->instance = $this->reflClass->newInstance();
}

return $this->instance;
}
}
35 changes: 35 additions & 0 deletions src/Presque/JobInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,52 @@

interface JobInterface
{
/**
* @param string $class Class that will be performing
* @param array $args List of arguments to pass to the `$class->perform()` method
*
* @throws InvalidArgumentException If `$class` does not implement the required methods
*/
static function create($class, array $args = array());

/**
* @return Boolean
*/
function isSuccessful();

/**
* @return Boolean
*/
function isError();

/**
* @return Boolean
*/
function isActive();

/**
* Does the work
*/
function perform();

/**
* Returns an instance of the `class`
*
* @return mixed
*/
function getInstance();

/**
* Returns the class name associated with this Job
*
* @return string
*/
function getClass();

/**
* Returns a list of arguments to pass to the `perform` method
*
* @return array
*/
function getArguments();
}
21 changes: 21 additions & 0 deletions src/Presque/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
namespace Presque;

use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Presque\Event\JobEvent;
use Presque\Event\WorkerEvent;

class Worker implements WorkerInterface
{
private $id;
private $queues;
private $status;
private $eventDispatcher;

public function __construct($id = null)
{
Expand Down Expand Up @@ -111,6 +114,14 @@ public function isDying()
*/
public function start()
{
if ($this->hasEventDispatcher()) {
$event = $this->eventDispatcher(Events::WORK_STARTED, new WorkerEvent($this));

if ($event->isCanceled()) {
return;
}
}

$this->setStatus(StatusInterface::RUNNING);

$this->run();
Expand Down Expand Up @@ -145,6 +156,16 @@ protected function process(QueueInterface $queue)
return;
}

if ($this->hasEventDispatcher()) {
$event = $this->eventDispatcher->dispatch(Events::JOB_STARTED, new JobEvent($job, $queue, $this));

if ($event->isCanceled()) {
return;
}

$job = $event->getJob();
}

$job->perform();

if ($job->isSuccessful()) {
Expand Down
24 changes: 24 additions & 0 deletions tests/Presque/Tests/JobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,30 @@ public function testCreatingASimpleJobWithTooFewArguments()
Job::create('Presque\Tests\Jobs\SimpleJob', array(1));
}

/**
* @expectedException Presque\Exception\InvalidArgumentException
*/
public function testCreatingAJobWithInvalidPerformMethod()
{
Job::create('Presque\Tests\Jobs\InvalidJob');
}

/**
* @expectedException Presque\Exception\InvalidArgumentException
*/
public function testCreatingAJobWithPrivatePerformMethod()
{
Job::create('Presque\Tests\Jobs\PrivateJob');
}

/**
* @expectedException Presque\Exception\InvalidArgumentException
*/
public function testCreatingAnIncompleteJob()
{
Job::create('Presque\Tests\Jobs\IncompleteJob');
}

public function testCreatingASimpleJob()
{
$job = Job::create('Presque\Tests\Jobs\SimpleJob', array('simple', 'jobs'));
Expand Down
Loading

0 comments on commit 33988d6

Please sign in to comment.