[Laravel 扩展包] 适用于 Laravel 的 AMQP 封装包
用于 Laravel 和 Lumen 的,作为消息发布和消费的 AMQP 组件,尤其适于 RabbitMQ 。
安装
Composer
在composer.json中添加以下依赖:
"bschmitt/laravel-amqp": "2.*" (Laravel >= 5.5)
"bschmitt/laravel-amqp": "1.*" (Laravel < 5.5)
composer update
或者
composer require bschmitt/laravel-amqp
集成
Lumen
在 Lumen 应用的根目录下创建 config 目录,复制 vendor/bschmitt/laravel-amqp/config/amqp.php 中的内容到 config/amqp.php 。
调整你需要的属性
return [
'use' => 'production',
'properties' => [
'production' => [
'host' => 'localhost',
'port' => 5672,
'username' => 'username',
'password' => 'password',
'vhost' => '/',
'exchange' => 'amq.topic',
'exchange_type' => 'topic',
'consumer_tag' => 'consumer',
'ssl_options' => [], // See https://secure.php.net/manual/en/context.ssl.php
'connect_options' => [], // See https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Connection/AMQPSSLConnection.php
'queue_properties' => ['x-ha-policy' => ['S', 'all']],
'exchange_properties' => [],
'timeout' => 0
],
],
];
在 bootstrap/app.php 中注册 Lumen 服务提供者:
/*
|--------------------------------------------------------------------------
| Register Service Providers
|--------------------------------------------------------------------------
*/
//...
$app->configure('amqp');
$app->register(Bschmitt\Amqp\LumenServiceProvider::class);
//...
为 Lumen 5.2+ 添加 Facade 支持
//...
$app->withFacades(true, [
'Bschmitt\Amqp\Facades\Amqp' => 'Amqp',
]);
//...
Laravel
打开 config/app.php 并添加服务提供者及别名:
Bschmitt\Amqp\AmqpServiceProvider',
'Amqp' => 'Bschmitt\Amqp\Facades\Amqp',
发布消息
使用路由键发布消息
Amqp::publish('routing-key', 'message');
使用路由键发布消息并创建队列
Amqp::publish('routing-key', 'message' , ['queue' => 'queue-name']);
使用路由键发布消息并重写属性
Amqp::publish('routing-key', 'message' , ['exchange' => 'amq.direct']);
消费消息
消费消息,确认回执并在无消息后停止
Amqp::consume('queue-name', function ($message, $resolver) {
var_dump($message->body);
$resolver->acknowledge($message);
$resolver->stopWhenProcessed();
});
持续消费消息
Amqp::consume('queue-name', function ($message, $resolver) {
var_dump($message->body);
$resolver->acknowledge($message);
});
使用自定义设置消费消息
Amqp::consume('queue-name', function ($message, $resolver) {
var_dump($message->body);
$resolver->acknowledge($message);
}, [
'timeout' => 2,
'vhost' => 'vhost3'
]);
扇形图示例
发布消息
\Amqp::publish('', 'message' , [
'exchange_type' => 'fanout',
'exchange' => 'amq.fanout',
]);
消费消息
\Amqp::consume('', function ($message, $resolver) {
var_dump($message->body);
$resolver->acknowledge($message);
}, [
'routing' => '',
'exchange' => 'amq.fanout',
'exchange_type' => 'fanout',
'queue_force_declare' => true,
'queue_exclusive' => true,
'persistent' => true // required if you want to listen forever
]);