composer.json#
{
"require": {
"php-amqplib/php-amqplib": "2.5.*"
}
}
task.php#
<?php
/**
* task.php
*
* @author: R2A <zysafe@live.cn> 2017-03-13
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 建立到RabbitMQ的连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明持久化队列 第三个参数为true
$channel->queue_declare('test', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) $data = "Hello World!";
for ($i = 0; $i < 10; $i++) {
// 测试数据 每一个.代表1秒执行时间
$str = '';
for ($j = 0; $j < rand(0, 5); $j++) $str .= '.';
$data = $i . $str;
// 设置持久化消息 delivery_mode 为2
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
);
$channel->basic_publish($msg, '', 'test');
echo " [x] Sent ", $data, "\n";
}
$channel->close();
$connection->close();
worker.php#
<?php
/**
* worker.php
*
* @author: R2A <zysafe@live.cn> 2017-03-13
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 持久化队列 第三个参数为true
$channel->queue_declare('test', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
// 返回响应 通知已结束
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
// basic_qos 的第二个参数设置为1 同时不发送超过1条消息到worker
$channel->basic_qos(null, 1, null);
$channel->basic_consume('test', '', false, false, false, false, $callback);
$i = 0;
while (count($channel->callbacks)) {
echo $i++;
$channel->wait();
}
$channel->close();
$connection->close();
...