Open
Description
Description
Implement queue (AMQP) wrapper for basic features.
- Send a message to the queue (explicit).
- Receive a message from the queue (explicit).
- Declare queue and DLX (implicit).
- Change the snippet to allow consumption/sending without declaring the DLX queue (A new private method should be created, and an optional parameter to send/receive should be added).
- Refactor the code to allow the usage of multiple servers (the send uses random, and the read loops through all of them).
- The servers should be an array and injected at the constructor.
- The consume method should receive optional parameters as the length of the QoS (on the snippet below, it is fixed in 10).
Tech notes
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Queue implements IQueue
{
private $connectionStrings;
public function __construct()
{
// TODO: Remove this code from here, receive the configuration as a parameter.
$configuration = new Configuration();
$this->connectionStrings = $configuration->getRabbitMq();
}
private function getServers()
{
if (empty($this->connectionStrings)) {
throw new QueueException("RabbitMQ connection strings not found");
}
$servers = [];
foreach ($this->connectionStrings as $connectionString) {
$url = parse_url($connectionString);
$servers[] = [
"host" => $url["host"],
"port" => isset($url["port"]) ? $url["port"] : 5672,
"user" => $url["user"],
"password" => $url["pass"],
"vhost" => ($url['path'] == '/' || !isset($url['path'])) ? '/' : substr($url['path'], 1)
];
}
return $servers;
}
private function getConnection()
{
$servers = $this->getServers();
$options = [];
if (count($servers) == 1) {
$options = ['connection_timeout' => 10.0, 'read_write_timeout' => 10.0,];
}
return AMQPStreamConnection::create_connection($servers, $options);
}
private function declareQueueWIthoutDLX($channel, $queueName)
{
// TODO: declare the queue without DLX.
}
// TODO: rename to declareQueueWithDLX
private function declareQueueAndDLX($channel, $queueName)
{
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable(
array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $queueName . '-retry'
)
)
);
$channel->queue_declare(
$queueName . '-retry',
false,
true,
false,
false,
false,
new AMQPTable(
array(
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $queueName,
'x-message-ttl' => 1000 * 60 * 60
)
)
);
}
public function publish($queueName, $message)
{
$connection = $this->getConnection();
$channel = $connection->channel();
$this->declareQueueAndDLX($channel, $queueName);
$msgOptions = array(
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
);
$msg = new AMQPMessage($message, $msgOptions);
$channel->basic_publish($msg, '', $queueName);
$channel->close();
$connection->close();
}
public function consume($timeout, $queueName, $callback, $resetTimeoutOnReceive = false)
{
$startTime = time();
$fn = function ($msg) use ($callback, $timeout, $startTime, $resetTimeoutOnReceive) {
if ($resetTimeoutOnReceive) {
$startTime = time();
}
$callback($timeout, $startTime, $msg);
};
$connection = $this->getConnection();
$channel = $connection->channel();
// TODO: check which one to call - declareQueueWithDLX or declareQueueWithoutDLX
$this->declareQueueAndDLX($channel, $queueName);
// TODO: the 10 should be an optional parameter of this method with default value as 10.
$channel->basic_qos(null, 10, null);
$channel->basic_consume($queueName, '', false, false, false, false, $fn);
while ($channel->is_consuming()) {
$channel->wait(null, true);
if ($startTime + $timeout < time()) {
break;
}
}
$channel->close();
$connection->close();
}
}