Skip to content

[FEATURE] Implement Queue wrapper #82

Open
@guibranco

Description

Description

Implement queue (AMQP) wrapper for basic features.

  • Send a message to the queue (explicit).
  • Receive a message from the queue (explicit).
  • Declare queue and DLX (implicit).
  • Change the snippet to allow consumption/sending without declaring the DLX queue (A new private method should be created, and an optional parameter to send/receive should be added).
  • Refactor the code to allow the usage of multiple servers (the send uses random, and the read loops through all of them).
  • The servers should be an array and injected at the constructor.
  • The consume method should receive optional parameters as the length of the QoS (on the snippet below, it is fixed in 10).

Tech notes

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class Queue implements IQueue
{
    private $connectionStrings;

    public function __construct()
    {
        // TODO: Remove this code from here, receive the configuration as a parameter.
        $configuration = new Configuration();
        $this->connectionStrings = $configuration->getRabbitMq();
    }

    private function getServers()
    {
        if (empty($this->connectionStrings)) {
            throw new QueueException("RabbitMQ connection strings not found");
        }

        $servers = [];
        foreach ($this->connectionStrings as $connectionString) {
            $url = parse_url($connectionString);
            $servers[] = [
                "host" => $url["host"],
                "port" => isset($url["port"]) ? $url["port"] : 5672,
                "user" => $url["user"],
                "password" => $url["pass"],
                "vhost" => ($url['path'] == '/' || !isset($url['path'])) ? '/' : substr($url['path'], 1)
            ];
        }

        return $servers;
    }

    private function getConnection()
    {
        $servers = $this->getServers();
        $options = [];
        if (count($servers) == 1) {
            $options = ['connection_timeout' => 10.0, 'read_write_timeout' => 10.0,];
        }

        return AMQPStreamConnection::create_connection($servers, $options);
    }

   private function declareQueueWIthoutDLX($channel, $queueName) 
   {
      // TODO: declare the queue without DLX.
   }

    // TODO: rename to declareQueueWithDLX
    private function declareQueueAndDLX($channel, $queueName)
    {
        $channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(
                array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $queueName . '-retry'
                )
            )
        );
        $channel->queue_declare(
            $queueName . '-retry',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(
                array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $queueName,
                    'x-message-ttl' => 1000 * 60 * 60
                )
            )
        );
    }

    public function publish($queueName, $message)
    {
        $connection = $this->getConnection();
        $channel = $connection->channel();
        $this->declareQueueAndDLX($channel, $queueName);
        $msgOptions = array(
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        );
        $msg = new AMQPMessage($message, $msgOptions);
        $channel->basic_publish($msg, '', $queueName);
        $channel->close();
        $connection->close();
    }

    public function consume($timeout, $queueName, $callback, $resetTimeoutOnReceive = false)
    {
        $startTime = time();
        $fn = function ($msg) use ($callback, $timeout, $startTime, $resetTimeoutOnReceive) {
            if ($resetTimeoutOnReceive) {
                $startTime = time();
            }
            $callback($timeout, $startTime, $msg);
        };

        $connection = $this->getConnection();
        $channel = $connection->channel();
        // TODO: check which one to call - declareQueueWithDLX or declareQueueWithoutDLX
        $this->declareQueueAndDLX($channel, $queueName);
        // TODO: the 10 should be an optional parameter of this method with default value as 10.
        $channel->basic_qos(null, 10, null);
        $channel->basic_consume($queueName, '', false, false, false, false, $fn);

        while ($channel->is_consuming()) {
            $channel->wait(null, true);
            if ($startTime + $timeout < time()) {
                break;
            }
        }

        $channel->close();
        $connection->close();
    }
}

Additional information

⚠️ 🚨 Add documentation and tests

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestgitautoGitAuto label to trigger the app in a issue.good first issueGood for newcomershacktoberfestParticipation in the Hacktoberfest eventhelp wantedExtra attention is needed📝 documentationTasks related to writing or updating documentation🕔 high effortA task that can be completed in a few days🧪 testsTasks related to testing

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions