2017年3月

composer.json

{
  "require": {
    "php-amqplib/php-amqplib": "2.5.*"
  }
}

task.php

<?php
/**
 * task.php
 *
 * @author: cnx7 <zysafe@live.cn> 2017-03-13
 */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立到RabbitMQ的连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明持久化队列 第三个参数为true
$channel->queue_declare('test', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) $data = "Hello World!";

for ($i = 0; $i < 10; $i++) {
    // 测试数据 每一个.代表1秒执行时间
    $str = '';
    for ($j = 0; $j < rand(0, 5); $j++) $str .= '.';
    $data = $i . $str;
    // 设置持久化消息 delivery_mode 为2
    $msg = new AMQPMessage($data,
        array('delivery_mode' => 2) # make message persistent
    );

    $channel->basic_publish($msg, '', 'test');

    echo " [x] Sent ", $data, "\n";

}
$channel->close();
$connection->close();

worker.php

<?php
/**
 * worker.php
 *
 * @author: cnx7 <zysafe@live.cn> 2017-03-13
 */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 持久化队列 第三个参数为true
$channel->queue_declare('test', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";

    // 返回响应 通知已结束
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// basic_qos 的第二个参数设置为1 同时不发送超过1条消息到worker
$channel->basic_qos(null, 1, null);

$channel->basic_consume('test', '', false, false, false, false, $callback);

$i = 0;
while (count($channel->callbacks)) {
    echo $i++;
    $channel->wait();
}

$channel->close();
$connection->close();

可同时开启多个worker消费,中断退出无影响

package main

import (
    "fmt"
    "sync"
    "time"
)

func f(wg *sync.WaitGroup, val string) {
    fmt.Println(val, "begin")
    time.Sleep(2 * time.Second) // 这里处理需要3秒钟
    fmt.Printf("Finished: %v - %v\n", val, time.Now())
    wg.Done()
}

func main() {

    t1 := time.Now()

    var wg sync.WaitGroup

    wg.Add(3) // 我们要做三个并发任务,这里添加3个协程

    go f(&wg, "goroutine A") // 第一个任务:实行函数 f

    // 第二个任务,匿名闭包函数
    go func(wg *sync.WaitGroup, val string) {
        fmt.Println(val, "begin")
        time.Sleep(3 * time.Second) // 处理需要3秒钟
        fmt.Printf("Finished: %v - %v\n", val, time.Now())
        wg.Done()
    }(&wg, "goroutine B") // 调用匿名函数

    go f(&wg, "goroutine C") // 第三个任务,调用f 函数

    wg.Wait() // 等待所有任务完成

    fmt.Printf("Finished all goroutines: %v\n", time.Now())

    t2 := time.Now()
    fmt.Println("消耗时间:", t2.Sub(t1), "秒")
}

服务起停

sudo /etc/init.d/supervisord {start|stop|status|restart|reload|force-reload|condrestart}

配置文件路径

/etc/supervisord.conf

配置项

[progran:example]              ;项目名
command=/bin/echo              ;supervisor启动时将要开启的进程。相对或绝对路径均可。若是相对路径则会从supervisord的$PATH变中查找。命令可带参数。  
priority=999                   ;指明进程启动和关闭的顺序。低优先级表明进程启动时较先启动关闭时较后关闭。高优先级表明进程启动时启动时较后启动关闭时较先关闭。  
autostart=true                 ;是否随supervisord启动而启动  
autorestart=true               ;进程意外退出后是否自动重启  
startsecs=10                   ;进程持续运行多久才认为是启动成功  
startretries=3                 ;重启失败的连续重试次数  
exitcodes=0,2                  ;若autostart设置为unexpected且监控的进程并非因为supervisord停止而退出,那么如果进程的退出码不在exitcode列表中supervisord将重启进程  
stopsignal=QUIT                ;杀进程的信号  
stopwaitsecs=10                ;向进程发出stopsignal后等待OS向supervisord返回SIGCHILD 的时间。若超时则supervisord将使用SIGKILL杀进程  

日志文件路径

/var/log/supervisor/supervisord.log