composer.json

{
  "require": {
    "php-amqplib/php-amqplib": "2.5.*"
  }
}

task.php

<?php
/**
 * task.php
 *
 * @author: cnx7 <zysafe@live.cn> 2017-03-13
 */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立到RabbitMQ的连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明持久化队列 第三个参数为true
$channel->queue_declare('test', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) $data = "Hello World!";

for ($i = 0; $i < 10; $i++) {
    // 测试数据 每一个.代表1秒执行时间
    $str = '';
    for ($j = 0; $j < rand(0, 5); $j++) $str .= '.';
    $data = $i . $str;
    // 设置持久化消息 delivery_mode 为2
    $msg = new AMQPMessage($data,
        array('delivery_mode' => 2) # make message persistent
    );

    $channel->basic_publish($msg, '', 'test');

    echo " [x] Sent ", $data, "\n";

}
$channel->close();
$connection->close();

worker.php

<?php
/**
 * worker.php
 *
 * @author: cnx7 <zysafe@live.cn> 2017-03-13
 */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 持久化队列 第三个参数为true
$channel->queue_declare('test', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";

    // 返回响应 通知已结束
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// basic_qos 的第二个参数设置为1 同时不发送超过1条消息到worker
$channel->basic_qos(null, 1, null);

$channel->basic_consume('test', '', false, false, false, false, $callback);

$i = 0;
while (count($channel->callbacks)) {
    echo $i++;
    $channel->wait();
}

$channel->close();
$connection->close();

可同时开启多个worker消费,中断退出无影响

标签: none

添加新评论