anik/amqp wrapper for Laravel-ish frameworks.
Checkout the repository for example.
To install the package, run
composer require anik/laravel-amqp
The Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class
service provider should automatically get registered. If
not, then you can manually add the service provider in your config/app.php
providers array:
'providers' => [
// ...
Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class,
]
- Publish configuration with
php artisan vendor:publish --provider "Anik\Laravel\Amqp\Providers\AmqpServiceProvider"
command.
- Register
Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class
service provider in yourbootstrap/app.php
file.
$app->register(Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class);
-
Copy configuration
amqp.php
in your config directory fromvendor/anik/laravel-amqp/src/config/amqp.php
. -
Import your configuration using
$app->configure('amqp');
in yourbootstrap/app.php
.
- Register
Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class
service provider in yourconfig/app.php
providers array:
'providers' => [
/// ...
Anik\Laravel\Amqp\Providers\AmqpServiceProvider::class,
]
- Copy configuration
amqp.php
in your config directory fromvendor/anik/laravel-amqp/src/config/amqp.php
.
In your config/amqp.php
, you can define multiple connections and use them from your code by pointing the connection
name.
amqp.default
denoting the default connection. Will be used if no connection is specified when producing or consuming messages.amqp.connections.*.connection.class
denoting the underlying Amqp connection to be used. By default, it uses lazy connection. You can change it to any implementation ofPhpAmqpLib\Connection\AbstractConnection
.amqp.connections.*.connection.hosts
can have multiple host configuration. Each host config must containhost
,port
,user
,password
keys. It can also containvhost
which is optional. Lazy connections cannot have more than one host configuration otherwise it'll throw error.- You can also pass optional array of parameters through
amqp.connections.*.connection.options
when creating an instance ofamqp.connections.*.connection.class
internally. amqp.connections.*.message
holds the default properties of a message when publishing.amqp.connections.*.exchange
holds the default properties of your exchange when publishing & consuming.amqp.connections.*.queue
holds the default properties of your queue when consuming.amqp.connections.*.consumer
holds the default properties of consumer when consuming.amqp.connections.*.qos
holds the default properties of QoS when consuming.
This package supports laravel octane by
default. To keep the AMQP connection alive,
you have to configure octane to warm
the connection, by adding 'amqp' to the warm array in octane configurations.
// config/octane.php
// ...
'warm' => [
// ...
'amqp', // <-- this line
],
The followings work the same.
use Anik\Amqp\ConsumableMessage;
use Anik\Laravel\Amqp\Facades\Amqp;
$messages = 'my message';
// $messages = ['my first message', 'my second message'];
// $messages = new Anik\Amqp\ProducibleMessage('my message');
// $messages = ['another message', new Anik\Amqp\ProducibleMessage('also another message')];
Amqp::publish($messages); // publishes to default connection
Amqp::connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection
app('amqp')->publish($messages); // publishes to default connection
app('amqp')->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection
app()->make('amqp')->publish($messages); // publishes to default connection
app()->make('amqp')->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection
/** @var \Anik\Laravel\Amqp\AmqpManager $amqpManager */
$amqpManager->publish($messages); // publishes to default connection
$amqpManager->connection('rabbitmq')->publish($messages); // publishes to rabbitmq connection
Amqp::consume(function(ConsumableMessage $message) {
var_dump($message->getMessageBody());
$message->ack();
}); // consumes from default connection
Amqp::connection('rabbitmq')->consume(function(ConsumableMessage $message) {
var_dump($message->getMessageBody());
$message->ack();
}); // consumes from rabbitmq connection
app('amqp')->consume(function(ConsumableMessage $message) {
var_dump($message->getMessageBody());
$message->ack();
}); // consumes from default connection
app('amqp')->connection('rabbitmq')->consume(function(ConsumableMessage $message) {
var_dump($message->getMessageBody());
$message->ack();
}); // consumes from rabbitmq connection
app()->make('amqp')->consume(function(ConsumableMessage $message) {
var_dump($message->getMessageBody());
$message->ack();
}); // consumes from default connection
app()->make('amqp')->connection('rabbitmq')->consume(function(ConsumableMessage $message) {
var_dump($message->getMessageBody());
$message->ack();
}); // consumes from rabbitmq connection
/** @var \Anik\Laravel\Amqp\AmqpManager $amqpManager */
$amqpManager->consume(function(ConsumableMessage $message) {
var_dump($message->getMessageBody());
$message->ack();
}); // consumes from default connection
$amqpManager->connection('rabbitmq')->consume(function(ConsumableMessage $message) {
var_dump($message->getMessageBody());
$message->ack();
}); // consumes from rabbitmq connection
In this documentation, it'll use FACADE afterwards. If you're using Lumen, then you can use other approaches. The package doesn't require enabling Facade.
To publish messages,
use Anik\Laravel\Amqp\Facades\Amqp;
Amqp::publish($messages, $routingKey, $exchange, $options);
Amqp::connection('rabbitmq')->publish($messages, $routingKey, $exchange, $options);
$messages
Type:mixed
. Required. It can be a single message, or an array of messages of any scalar type or implementation ofAnik\Amqp\Producible
.$routingKey
Type:string
. Optional. Default:''
(empty string).$exchange
Type:null | Anik\Amqp\Exchanges\Exchange
. Optional. Default:null
.$options
Type:array
. Optional. Default:[]
.- Key
message
- Accepts:array
. Valid properties forPhpAmqpLib\Message\AMQPMessage
. - Key
exchange
- Accepts:array
. Refer toamqp.connections.*.exchange
. - Key
publish
- Accepts:array
. Refer toAnik\Amqp\Producer::publishBatch
- Key
- If any of the
$messages
is not an implementation ofAnik\Amqp\Producible
, then that message will be converted toAnik\Amqp\Producible
usingAnik\Amqp\ProducibleMessage
. - When converting to
Anik\Amqp\Producible
, it'll try to use$options['message']
as the message property. If not set, it'll then try to useamqp.connections.*.message
properties if available. - If
$exchange
is set tonull
, it'll check if$options['exchange']
is set or not. If not set, it'll then useamqp.connections.*.exchange
properties if available. - If
$options['publish']
is not set, it'll try to useamqp.connections.*.publish
properties if available.
To consume messages,
use Anik\Laravel\Amqp\Facades\Amqp;
Amqp::consume($handler, $bindingKey, $exchange, $queue, $qos , $options);
Amqp::connection('rabbitmq')->consume($handler, $bindingKey, $exchange, $queue, $qos , $options);
$handler
Type:callable | Anik\Amqp\Consumable
. Required.$bindingKey
Type:string
. Optional. Default:''
(empty string).$exchange
Type:null | Anik\Amqp\Exchanges\Exchange
. Optional. Default:null
.$queue
Type:null | Anik\Amqp\Queues\Queue
. Optional. Default:null
.$qos
Type:null | Anik\Amqp\Qos\Qos
. Optional. Default:null
.$options
Type:array
. Optional. Default:[]
.- Key
exchange
- Accepts:array
. Refer toamqp.connections.*.exchange
. - Key
queue
- Accepts:array
. Refer toamqp.connections.*.queue
. - Key
qos
- Accepts:array
. Refer toamqp.connections.*.qos
. - Key
consumer
- Accepts:array
. Refer toamqp.connections.*.consumer
. - Key
bind
- Accepts:array
. Refer toAnik\Amqp\Consumer::consume
- Key
- If
$handler
is not an implementation ofAnik\Amqp\Consumable
, then the handler will be converted toAnik\Amqp\Consumable
usingAnik\Amqp\ConsumableMessage
. - If
$exchange
is set tonull
, it'll check if$options['exchange']
is set or not. If not set, it'll then useamqp.connections.*.exchange
properties if available. - If
$queue
is set tonull
, it'll check if$options['queue']
is set or not. If not set, it'll then useamqp.connections.*.queue
properties if available. - If
$qos
is set tonull
, it'll check if$options['qos']
is set or not. If not set, it'll then useamqp.connections.*.qos
properties ifamqp.connections.*.qos.enabled
is set to a truthy value. - If
$options['bind']
is not set, it'll useamqp.connections.*.bind
properties if available. - If
$options['consumer']
is not set, it'll useamqp.connections.*.consumer
properties if available.
The package allows asserting a few scenarios. Before you can run those assertions, you'll need use Amqp::fake()
.
<?php
use Anik\Laravel\Amqp\Facades\Amqp;
use PHPUnit\Framework\TestCase;
class MyTest extends TestCase
{
public function testIfMessageWasProduced () {
Amqp::fake();
// ... Your code
Amqp::assertPublished();
// Amqp::assertPublished("my-message");
// Amqp::assertPublishedCount(5, "my-message");
// Amqp::assertPublished(Anik\Amqp\ProducibleMessage::class);
// Amqp::assertPublished(Anik\Amqp\Producible::class);
Amqp::assertPublishedOnConnection('rabbitmq');
}
}
Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnConnection(string $name)
- To check if at least one message was published on the connection$name
.Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnExchange(string $name)
- To check if at least one message was published on exchange$name
.Anik\Laravel\Amqp\Facades\Amqp::assertPublishedOnExchangeType(string $type)
- To check if at least one message was published on exchange type$type
.Anik\Laravel\Amqp\Facades\Amqp::assertPublishedWithRoutingKey(string $key)
- To check if at least one message was published with routing key$key
.Anik\Laravel\Amqp\Facades\Amqp::assertPublished($message = null)
- If
$message
isnull
, it will check if at least one message was published. - Otherwise, checks in the following order.
- If a message exactly matches the
$message
. - If a message exactly matches the
get_class($message)
. - If a message is an implementation of
$message
.
- If a message exactly matches the
- If
Anik\Laravel\Amqp\Facades\Amqp::assertNotPublished($message = null)
- If
$message
isnull
, it will check if no message was published. - Otherwise, checks in the following order.
- No message was published that exactly matches the
$message
. - No message was published that exactly matches the
get_class($message)
. - No message was published that is an implementation of
$message
.
- No message was published that exactly matches the
- If
Anik\Laravel\Amqp\Facades\Amqp::assertPublishedCount(int $count, $message = null)
- If
$message
isnull
, it will check if exactly$count
messages have been published. - Otherwise, checks in the following order.
- If a message exactly matches the
$message
. - If a message exactly matches the
get_class($message)
. - If a message is an implementation of
$message
.
- If a message exactly matches the
- If
Using Anik\Laravel\Amqp\Facades\Amqp::consume()
after Anik\Laravel\Amqp\Facades\Amqp::fake()
will throw exception.