diff --git a/src/Presque/AbstractJob.php b/src/Presque/AbstractJob.php index 0731f3a..21e8e53 100644 --- a/src/Presque/AbstractJob.php +++ b/src/Presque/AbstractJob.php @@ -15,6 +15,18 @@ abstract class AbstractJob implements JobInterface { + protected $status; + + public function setStatus($status) + { + $this->status = $status; + } + + public function getStatus() + { + return $this->status; + } + public function isSuccessful() { return StatusInterface::SUCCESS === $this->getStatus(); diff --git a/src/Presque/Event/WorkerEvent.php b/src/Presque/Event/WorkerEvent.php new file mode 100644 index 0000000..db4075f --- /dev/null +++ b/src/Presque/Event/WorkerEvent.php @@ -0,0 +1,22 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Presque\Event; + +class WorkerEvent extends Event +{ + private $worker; + + public function __construct(WorkerInterface $worker) + { + $this->worker = $worker; + } +} \ No newline at end of file diff --git a/src/Presque/Events.php b/src/Presque/Events.php index 49ffad5..fc465a6 100644 --- a/src/Presque/Events.php +++ b/src/Presque/Events.php @@ -13,4 +13,9 @@ final class Events { + const WORK_STARTED = 'presque.work.started'; + + const WORK_PAUSED = 'presque.work.paused'; + + const WORK_STOPPED = 'presque.work.stopped'; } \ No newline at end of file diff --git a/src/Presque/Job.php b/src/Presque/Job.php index 1fd8545..3654265 100644 --- a/src/Presque/Job.php +++ b/src/Presque/Job.php @@ -18,6 +18,9 @@ class Job extends AbstractJob protected $class; protected $args; + protected $lastResult; + protected $lastError; + private $reflClass; private $reflMethod; private $instance; @@ -68,10 +71,23 @@ public function __construct($class, array $args = array()) public function perform() { - return $this->reflMethod->invokeArgs( - $this->getInstance(), - $this->getArguments() - ); + $this->lastResult = $this->lastError = null; + + $this->setStatus(StatusInterface::RUNNING); + + try { + $this->lastResult = $this->reflMethod->invokeArgs( + $this->getInstance(), + $this->getArguments() + ); + + $this->setStatus(StatusInterface::SUCCESS); + } catch (\Exception $e) { + $this->setStatus(StatusInterface::FAILED); + $this->lastError = $e; + } + + return $this; } public function getMaxAttempts() diff --git a/src/Presque/Queue.php b/src/Presque/Queue.php index 6e65b84..bda0c2e 100644 --- a/src/Presque/Queue.php +++ b/src/Presque/Queue.php @@ -17,11 +17,13 @@ class Queue implements QueueInterface { protected $name; protected $storage; + protected $timeout; - public function __construct($name, StorageInterface $storage = null) + public function __construct($name, StorageInterface $storage = null, $timeout = 10) { $this->name = $name; $this->storage = $storage; + $this->timeout = $timeout; } public function getName() @@ -29,6 +31,11 @@ public function getName() return $this->name; } + public function getTimeout() + { + return $this->timeout; + } + public function getStorage() { return $this->storage; @@ -39,11 +46,6 @@ public function setStorage(StorageInterface $storage) $this->storage = $storage; } - public function getWaitTime() - { - - } - public function enqueue(JobInterface $job) { $this->storage->push($this->name, array( @@ -52,8 +54,14 @@ public function enqueue(JobInterface $job) )); } - public function dequeue($waitFor = null) + public function reserve() { + $payload = $this->storage->pop($this->name, $this->getTimeout()); + + if (!is_array($payload)) { + return false; + } + return Job::create($payload['class'], $payload['args']); } } \ No newline at end of file diff --git a/src/Presque/QueueInterface.php b/src/Presque/QueueInterface.php index f4f82d9..4deaa05 100644 --- a/src/Presque/QueueInterface.php +++ b/src/Presque/QueueInterface.php @@ -21,9 +21,9 @@ function getStorage(); function setStorage(StorageInterface $storage); - function getWaitTime(); + function getTimeout(); function enqueue(JobInterface $job); - function dequeue($waitFor = null); + function reserve(); } \ No newline at end of file diff --git a/src/Presque/StatusInterface.php b/src/Presque/StatusInterface.php index 88a3481..5a8bbb5 100644 --- a/src/Presque/StatusInterface.php +++ b/src/Presque/StatusInterface.php @@ -9,7 +9,7 @@ * file that was distributed with this source code. */ -namespace Presque\Storage; +namespace Presque; interface StatusInterface { @@ -17,4 +17,5 @@ interface StatusInterface CONST FAILED = 2; CONST RUNNING = 4; CONST EXPIRED = 7; + CONST DYING = 8; } \ No newline at end of file diff --git a/src/Presque/Storage/PredisStorage.php b/src/Presque/Storage/PredisStorage.php index c66a81f..a407910 100644 --- a/src/Presque/Storage/PredisStorage.php +++ b/src/Presque/Storage/PredisStorage.php @@ -26,7 +26,7 @@ public function __construct(Client $connection, $prefix = null) public function push($listName, $payload) { - + $this->connection->lpush($this->getKey($listName), json_encode($payload)); } public function pop($listName, $waitTimeout = null) diff --git a/src/Presque/Worker.php b/src/Presque/Worker.php index 1f71fa3..e90e73c 100644 --- a/src/Presque/Worker.php +++ b/src/Presque/Worker.php @@ -13,6 +13,63 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface; -class Worker +class Worker implements WorkerInterface { + private $id; + private $queues; + + public function __construct($id) + { + $this->id = $id; + } + + public function getId() + { + return $this->id; + } + + public function addQueue(QueueInterface $queue) + { + $this->queues[] = $queue; + } + + public function removeQueue(QueueInterface $queue) + { + if ($key = array_search($queue, $this->queues)) { + unset($this->queues[$queue]); + } + } + + public function getQueues() + { + return $this->queues; + } + + public function isRunning() + { + return $this->getStatus() === StatusInterface::RUNNING; + } + + public function isDying() + { + return $this->getStatus() === StatusInterface::DYING; + } + + public function run() + { + while ($this->isRunning()) { + foreach ($this->getQueues() as $queue) { + $this->process($queue); + + if (!$this->isRunning() || $this->isDying()) { + return; + } + } + } + } + + protected function process(QueueInterface $queue) + { + + } } \ No newline at end of file diff --git a/src/Presque/WorkerInterface.php b/src/Presque/WorkerInterface.php index 319db6a..4926037 100644 --- a/src/Presque/WorkerInterface.php +++ b/src/Presque/WorkerInterface.php @@ -13,6 +13,18 @@ interface WorkerInterface extends EventDispatcherAwareInterface { + function getId(); + + function addQueue(QueueInterface $queue); + + function removeQueue(QueueInterface $queue); + + function getQueues(); + + function isRunning(); + + function isDying(); + function start(); function pause(); diff --git a/tests/Presque/Tests/JobTest.php b/tests/Presque/Tests/JobTest.php index f99613c..148fe31 100644 --- a/tests/Presque/Tests/JobTest.php +++ b/tests/Presque/Tests/JobTest.php @@ -58,7 +58,7 @@ public function testCreatingASimpleJob() } $job = Job::create('Presque\Tests\Jobs\SimpleJob', array('simple', 'job')); - $this->assertTrue($job->perform()); + $this->assertTrue($job->perform()->isSuccessful()); } } \ No newline at end of file diff --git a/tests/Presque/Tests/QueueTest.php b/tests/Presque/Tests/QueueTest.php index e261727..553ce03 100644 --- a/tests/Presque/Tests/QueueTest.php +++ b/tests/Presque/Tests/QueueTest.php @@ -18,7 +18,7 @@ class QueueTest extends TestCase public function testAddingJobsToQueue() { $job = $this->getMock('Presque\JobInterface'); - $storage = $this->getMock('Presque\Storage\StorageInterface'); + $storage = $this->getStorageMock(); $queue = new Queue('queue'); $queue->setStorage($storage); @@ -44,6 +44,31 @@ public function testAddingJobsToQueue() ->with($this->equalTo('queue'), $this->equalTo($expectedPayload)); $queue->enqueue($job); + } + + public function testGrabbingJobFromQueue() + { + $storage = $this->getStorageMock(); + $storage + ->expects($this->once()) + ->method('pop') + ->with($this->equalTo('queuyou'), $this->equalTo(10)) + ->will($this->returnValue(array( + 'class' => 'Presque\Tests\Jobs\SimpleJob', + 'args' => array('simple', 'job') + ))); + $queue = new Queue('queuyou', $storage); + + $job = $queue->reserve(); + + $this->assertInstanceOf('Presque\JobInterface', $job); + $this->assertEquals('Presque\Tests\Jobs\SimpleJob', $job->getClass()); + $this->assertEquals(array('simple', 'job'), $job->getArguments()); + } + + private function getStorageMock() + { + return $this->getMock('Presque\Storage\StorageInterface'); } } \ No newline at end of file