分类 PHP 下的文章

Queue是一个先进先出的集合,在工作中经常会使用到。

但是难免会遇到这样一种情况:一个用户或某几个用户推了太多的任务在队列前排,其他新的用户可能只有很少一部分任务却得不到执行,在服务器资源有限、消费者执行速度有限的情况下,对后面的用户造成使用体验很差的情况(有的人推荐使用高低速队列分组,然而事实却并不能提供一个合理的阙值来区分任务组)。

例:用户 a,b,c,d,e 的任务以 A,B,C,D,E 来标记,他们各自推送了 8,9,4,5,2 条任务,正常的执行流程为(换行为更容易看清楚)

A A A A A A A A ->
B B B B B B B B B->
C C C C ->
D D D D D ->
E E

用户E虽然只有两条任务待处理,但是需要等待很久。

对此我们的解决方案是:修改队列执行顺序逻辑,以用户进行分组,在相同资源情况下,给所有用户一致和公平的体验。

A B C D E ->
A B C D E ->
A B C D ->
A B C D ->
A B D ->
A B ->
A B ->
A B ->
B

over,不想往后看的可以去拿代码了:
基于Yii2-Queue的Redis驱动修改: https://github.com/cnx7/yii2-queue


以下是源码分析

我们目前项目中使用的是Yii2-Queue,源码地址为https://github.com/yiisoft/yii2-queue,目前版本2.0.1。以下是对Yii2-Queue的Redis驱动执行流程分析及优化过程:

在程序中注册\yii\queue\redis\Queue,然后推送任务的调用方式:

Yii::$app->queue->push(new SomeJob());

跟踪得到push()方法继承于src/Queue.php中,建立任务调用所注册驱动程序的pushMessage($message, $ttr, $delay, $priority)方法,Redis驱动的代码在src/drivers/Queue.php中。

push源码注解:

    protected function pushMessage($message, $ttr, $delay, $priority)
    {
        // Redis驱动不支持作业优先级 如果有传值则抛出错误
        if ($priority !== null) {
            throw new NotSupportedException('Job priority is not supported in the driver.');
        }
        // 取到上条 message_id + 1 ,作为本条任务的ID
        $id = $this->redis->incr("$this->channel.message_id");
        // 以新ID将本条任务格存储到hash表 messages 中
        $this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
        if (!$delay) {
            // 如果不需要等待执行 则将任务ID推到hash表 waiting 中
            $this->redis->lpush("$this->channel.waiting", $id);
        } else {
            // 如果需要等待执行 则推送到有序集合 delayed 中
            $this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
        }
        // 返回任务ID
        return $id;
    }

reserve源码注解:

    /**
     * @param int $wait timeout
     * @return array|null payload
     */
    protected function reserve($wait)
    {
        // 将延迟和保留的作业移动到等待列表中 并锁定一秒
        if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
            $this->moveExpired("$this->channel.delayed");
            $this->moveExpired("$this->channel.reserved");
        }
        // Find a new waiting message
        $id = null;
        if (!$wait) {
            // 从等待列表取1条任务ID待执行
            $id = $this->redis->rpop("$this->channel.waiting");
        } elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
            $id = $result[1];
        }
        if (!$id) {
            return null;
        }
        // 根据任务ID取出任务
        $payload = $this->redis->hget("$this->channel.messages", $id);
        list($ttr, $message) = explode(';', $payload, 2);
        // 加入到作业列表
        $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
        $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);
        return [$id, $message, $ttr, $attempt];
    }

以上是关键的两段代码,经过修改逻辑如下:

    /**
     * @inheritdoc
     */
    protected function pushMessage($message, $ttr, $delay, $priority, $group)
    {
        if ($priority !== null) {
            throw new NotSupportedException('Job priority is not supported in the driver.');
        }

        $id = $this->redis->incr("$this->channel.message_id");
        $this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
        if (!$delay) {
            $group ?
                $this->redis->lpush("$this->channel.waiting.$group", $id) :
                $this->redis->lpush("$this->channel.waiting", $id);
        } else {
            $this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
        }

        return $id;
    }
    /**
     * @param int $wait timeout
     * @return array|null payload
     */
    protected function reserve($wait)
    {
        // Moves delayed and reserved jobs into waiting list with lock for one second
        if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
            $this->moveExpired("$this->channel.delayed");
            $this->moveExpired("$this->channel.reserved");
        }

        // 如果列表为空  则遍历所有的子队列 每个拿一条数据放到列表
        if(!$this->redis->llen("$this->channel.waiting")){
            $keys = $this->redis->keys("$this->channel.waiting.*");
            foreach ($keys as $key){
                $id = $this->redis->rpop($key);
                $this->redis->lpush("$this->channel.waiting", $id);
            }
        }

        // Find a new waiting message
        $id = null;
        if (!$wait) {
            $id = $this->redis->rpop("$this->channel.waiting");
        } elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
            $id = $result[1];
        }
        if (!$id) {
            return null;
        }

        $payload = $this->redis->hget("$this->channel.messages", $id);
        list($ttr, $message) = explode(';', $payload, 2);
        $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
        $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);

        return [$id, $message, $ttr, $attempt];
    }

手机拍照上传的文件有时候方向会变,文件EXIF信息引起的,有的浏览器不支持解析EXIF就会出现问题,包括一些图片压缩软件/插件/中间件,比如Nginx的image_filter组件以及ImageOptim之类的软件。

解决方案:上传图片检测图片是否存在EXIF信息的旋转标记,清空标记并生成新的图片。

建议跳过GIF图片的处理

/**
     * 图片处理/压缩
     * @param $path string 原图片路径
     * @param string $new_path string 新的图片路径,为空则覆盖原图
     * @param int $cr 压缩率,为空则是70%
     * @return bool
     */
function imgDeal($path, $new_path = '', $cr = 70) {
    $new_path = $new_path ? $new_path : $path;

    $exif = exif_read_data($path, 0, true);


    $data = @getimagesize($path);
    if (!$data) return false;

    $w = $data[0];
    $h = $data[1];

    $xz = 0;
    if ($exif && !empty($exif['IFD0'])) {
        if (!empty($exif['IFD0']['Orientation'])) {
            switch ($exif['IFD0']['Orientation']) {
                case 3:
                    $xz = 180;
                    break;
                case 6:
                    $xz = -90;
                    $w = $data[1];
                    $h = $data[0];
                    break;
                case 8:
                    $xz = 90;
                    $w = $data[1];
                    $h = $data[0];
                    break;
                default:
                    # code...
                    break;
            }
        }
    }

    //读取旧图片
    $src_f = '';
    switch ($data[2]) {
        case 1:
            $src_f = imagecreatefromgif($path);
            break;
        case 2:
            $src_f = imagecreatefromjpeg($path);
            break;
        case 3:
            $src_f = imagecreatefrompng($path);
            break;
    }
    if ($src_f == "") return false;

    $rotate = @imagerotate($src_f, $xz, 0);

    $newim = imagecreatetruecolor($w,$h);

    imagecopyresampled($newim, $rotate, 0, 0, 1, 1, $w, $h, $w - 3, $h - 3);

    if (!imagejpeg($newim, $new_path, $cr)) return false;
    @imagedestroy($rotate);
    @imagedestroy($newim);

    return true;
}

PHP Warning:  PHP Startup: Unable to load dynamic library '/usr/local/opt/php56-mcrypt/mcrypt.so' - dlopen(/usr/local/opt/php56-mcrypt/mcrypt.so, 9): Library not loaded: /usr/local/opt/libtool/lib/libltdl.7.dylib
  Referenced from: /usr/local/opt/php56-mcrypt/mcrypt.so
  Reason: image not found in Unknown on line 0

解决方案

brew install libtool

composer.json

{
  "require": {
    "php-amqplib/php-amqplib": "2.5.*"
  }
}

task.php

<?php
/**
 * task.php
 *
 * @author: cnx7 <zysafe@live.cn> 2017-03-13
 */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立到RabbitMQ的连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明持久化队列 第三个参数为true
$channel->queue_declare('test', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) $data = "Hello World!";

for ($i = 0; $i < 10; $i++) {
    // 测试数据 每一个.代表1秒执行时间
    $str = '';
    for ($j = 0; $j < rand(0, 5); $j++) $str .= '.';
    $data = $i . $str;
    // 设置持久化消息 delivery_mode 为2
    $msg = new AMQPMessage($data,
        array('delivery_mode' => 2) # make message persistent
    );

    $channel->basic_publish($msg, '', 'test');

    echo " [x] Sent ", $data, "\n";

}
$channel->close();
$connection->close();

worker.php

<?php
/**
 * worker.php
 *
 * @author: cnx7 <zysafe@live.cn> 2017-03-13
 */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 持久化队列 第三个参数为true
$channel->queue_declare('test', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";

    // 返回响应 通知已结束
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// basic_qos 的第二个参数设置为1 同时不发送超过1条消息到worker
$channel->basic_qos(null, 1, null);

$channel->basic_consume('test', '', false, false, false, false, $callback);

$i = 0;
while (count($channel->callbacks)) {
    echo $i++;
    $channel->wait();
}

$channel->close();
$connection->close();

可同时开启多个worker消费,中断退出无影响

1、如果能将类的方法定义成static,就尽量定义成static,它的速度会提升将近4倍。

2、$row[’id’] 的速度是$row[id]的7倍。

3、echo 比 print 快,并且使用echo的多重参数(译注:指用逗号而不是句点)代替字符串连接,比如echo $str1,$str2。

4、在执行for循环之前确定最大循环数,不要每循环一次都计算最大值,最好运用foreach代替。

5、注销那些不用的变量尤其是大数组,以便释放内存。

6、尽量避免使用__get,__set,__autoload。

7、require_once()代价昂贵。

8、include文件时尽量使用绝对路径,因为它避免了PHP去include_path里查找文件的速度,解析操作系统路径所需的时间会更少。

9、如果你想知道脚本开始执行(译注:即服务器端收到客户端请求)的时刻,使用$_SERVER[‘REQUEST_TIME’]要好于time()。

10、函数代替正则表达式完成相同功能。

11、str_replace函数比preg_replace函数快,但strtr函数的效率是str_replace函数的四倍。

12、如果一个字符串替换函数,可接受数组或字符作为参数,并且参数长度不太长,那么可以考虑额外写一段替换代码,使得每次传递参数是一个字符,而不是只写一行代码接受数组作为查询和替换的参数。

13、使用选择分支语句(译注:即switch case)好于使用多个if,else if语句。

14、用@屏蔽错误消息的做法非常低效,极其低效。

15、打开apache的mod_deflate模块,可以提高网页的浏览速度。

16、数据库连接当使用完毕时应关掉,不要用长连接。

17、错误消息代价昂贵。

18、在方法中递增局部变量,速度是最快的。几乎与在函数中调用局部变量的速度相当。

19、递增一个全局变量要比递增一个局部变量慢2倍。

20、递增一个对象属性(如:$this->prop++)要比递增一个局部变量慢3倍。