Skip to content

Commit

Permalink
More tests and interfaces being added
Browse files Browse the repository at this point in the history
Just about have a very basic working version.
  • Loading branch information
justinrainbow committed Jul 22, 2012
1 parent 37aba9f commit b482871
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 18 deletions.
12 changes: 12 additions & 0 deletions src/Presque/AbstractJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@

abstract class AbstractJob implements JobInterface
{
protected $status;

public function setStatus($status)
{
$this->status = $status;
}

public function getStatus()
{
return $this->status;
}

public function isSuccessful()
{
return StatusInterface::SUCCESS === $this->getStatus();
Expand Down
22 changes: 22 additions & 0 deletions src/Presque/Event/WorkerEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

/*
* This file is part of the Presque package.
*
* (c) Justin Rainbow <justin.rainbow@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Presque\Event;

class WorkerEvent extends Event
{
private $worker;

public function __construct(WorkerInterface $worker)
{
$this->worker = $worker;
}
}
5 changes: 5 additions & 0 deletions src/Presque/Events.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@

final class Events
{
const WORK_STARTED = 'presque.work.started';

const WORK_PAUSED = 'presque.work.paused';

const WORK_STOPPED = 'presque.work.stopped';
}
24 changes: 20 additions & 4 deletions src/Presque/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class Job extends AbstractJob
protected $class;
protected $args;

protected $lastResult;
protected $lastError;

private $reflClass;
private $reflMethod;
private $instance;
Expand Down Expand Up @@ -68,10 +71,23 @@ public function __construct($class, array $args = array())

public function perform()
{
return $this->reflMethod->invokeArgs(
$this->getInstance(),
$this->getArguments()
);
$this->lastResult = $this->lastError = null;

$this->setStatus(StatusInterface::RUNNING);

try {
$this->lastResult = $this->reflMethod->invokeArgs(
$this->getInstance(),
$this->getArguments()
);

$this->setStatus(StatusInterface::SUCCESS);
} catch (\Exception $e) {
$this->setStatus(StatusInterface::FAILED);
$this->lastError = $e;
}

return $this;
}

public function getMaxAttempts()
Expand Down
22 changes: 15 additions & 7 deletions src/Presque/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@ class Queue implements QueueInterface
{
protected $name;
protected $storage;
protected $timeout;

public function __construct($name, StorageInterface $storage = null)
public function __construct($name, StorageInterface $storage = null, $timeout = 10)
{
$this->name = $name;
$this->storage = $storage;
$this->timeout = $timeout;
}

public function getName()
{
return $this->name;
}

public function getTimeout()
{
return $this->timeout;
}

public function getStorage()
{
return $this->storage;
Expand All @@ -39,11 +46,6 @@ public function setStorage(StorageInterface $storage)
$this->storage = $storage;
}

public function getWaitTime()
{

}

public function enqueue(JobInterface $job)
{
$this->storage->push($this->name, array(
Expand All @@ -52,8 +54,14 @@ public function enqueue(JobInterface $job)
));
}

public function dequeue($waitFor = null)
public function reserve()
{
$payload = $this->storage->pop($this->name, $this->getTimeout());

if (!is_array($payload)) {
return false;
}

return Job::create($payload['class'], $payload['args']);
}
}
4 changes: 2 additions & 2 deletions src/Presque/QueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ function getStorage();

function setStorage(StorageInterface $storage);

function getWaitTime();
function getTimeout();

function enqueue(JobInterface $job);

function dequeue($waitFor = null);
function reserve();
}
3 changes: 2 additions & 1 deletion src/Presque/StatusInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
* file that was distributed with this source code.
*/

namespace Presque\Storage;
namespace Presque;

interface StatusInterface
{
CONST SUCCESS = 1;
CONST FAILED = 2;
CONST RUNNING = 4;
CONST EXPIRED = 7;
CONST DYING = 8;
}
2 changes: 1 addition & 1 deletion src/Presque/Storage/PredisStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function __construct(Client $connection, $prefix = null)

public function push($listName, $payload)
{

$this->connection->lpush($this->getKey($listName), json_encode($payload));
}

public function pop($listName, $waitTimeout = null)
Expand Down
59 changes: 58 additions & 1 deletion src/Presque/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,63 @@

use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class Worker
class Worker implements WorkerInterface
{
private $id;
private $queues;

public function __construct($id)
{
$this->id = $id;
}

public function getId()
{
return $this->id;
}

public function addQueue(QueueInterface $queue)
{
$this->queues[] = $queue;
}

public function removeQueue(QueueInterface $queue)
{
if ($key = array_search($queue, $this->queues)) {
unset($this->queues[$queue]);
}
}

public function getQueues()
{
return $this->queues;
}

public function isRunning()
{
return $this->getStatus() === StatusInterface::RUNNING;
}

public function isDying()
{
return $this->getStatus() === StatusInterface::DYING;
}

public function run()
{
while ($this->isRunning()) {
foreach ($this->getQueues() as $queue) {
$this->process($queue);

if (!$this->isRunning() || $this->isDying()) {
return;
}
}
}
}

protected function process(QueueInterface $queue)
{

}
}
12 changes: 12 additions & 0 deletions src/Presque/WorkerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@

interface WorkerInterface extends EventDispatcherAwareInterface
{
function getId();

function addQueue(QueueInterface $queue);

function removeQueue(QueueInterface $queue);

function getQueues();

function isRunning();

function isDying();

function start();

function pause();
Expand Down
2 changes: 1 addition & 1 deletion tests/Presque/Tests/JobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public function testCreatingASimpleJob()
}

$job = Job::create('Presque\Tests\Jobs\SimpleJob', array('simple', 'job'));
$this->assertTrue($job->perform());
$this->assertTrue($job->perform()->isSuccessful());
}

}
27 changes: 26 additions & 1 deletion tests/Presque/Tests/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class QueueTest extends TestCase
public function testAddingJobsToQueue()
{
$job = $this->getMock('Presque\JobInterface');
$storage = $this->getMock('Presque\Storage\StorageInterface');
$storage = $this->getStorageMock();

$queue = new Queue('queue');
$queue->setStorage($storage);
Expand All @@ -44,6 +44,31 @@ public function testAddingJobsToQueue()
->with($this->equalTo('queue'), $this->equalTo($expectedPayload));

$queue->enqueue($job);
}

public function testGrabbingJobFromQueue()
{
$storage = $this->getStorageMock();
$storage
->expects($this->once())
->method('pop')
->with($this->equalTo('queuyou'), $this->equalTo(10))
->will($this->returnValue(array(
'class' => 'Presque\Tests\Jobs\SimpleJob',
'args' => array('simple', 'job')
)));

$queue = new Queue('queuyou', $storage);

$job = $queue->reserve();

$this->assertInstanceOf('Presque\JobInterface', $job);
$this->assertEquals('Presque\Tests\Jobs\SimpleJob', $job->getClass());
$this->assertEquals(array('simple', 'job'), $job->getArguments());
}

private function getStorageMock()
{
return $this->getMock('Presque\Storage\StorageInterface');
}
}

0 comments on commit b482871

Please sign in to comment.