垃圾回收

简称GC。顾名思义,就是废物利用的意思。
说垃圾回收机制之前,先接触一下内存泄露。

内存泄露

内存泄漏(Memory Leak)是指程序中已动态分配的堆内存由于某种原因程序未释放或无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果。

C语言的垃圾回收机制

如果用过C语言,那么申请内存的方式是malloc或者是calloc,然后你用完这个内存后,一定不要忘了用free函数去释放掉,这就是手动垃圾回收,一般都是使用这种方式。

PHP的自动垃圾回收机制

我们都知道PHP是C语言实现的。你想想如何用C语言实现对一个变量的统计及释放?C语言是如何实现一个变量,从声明开始到没人使用了,就把这个变量所占的内存释放掉(被垃圾回收)。

PHP进行内存管理的核心算法一共两项:
一是引用计数,二是写时拷贝

声明一个PHP变量的时候,C语言就在底层生成了一个叫做zval的struct,如下:

zval {
    string "a" // 变量名为a
    value zend_value // 变量的值,联合体
    type string // 变量是字符串类型
}

zval struct结构体

  1. PHP $a 的变量名
  2. PHP $a 的变量类型
  3. PHP 变量 $a 的zend_value联合体
    如果给变量赋值了,比如"hello world",那么C语言就在底层再生成一个叫做zend_value的union
zend_value {
    string "hello world" // 值内容
    refcount 1 // 引用计数
}

zend_value联合体

  1. 保存PHP $a 的变量值
  2. 记录PHP $a 变量的引用计数

何为引用计数?

$a = "hello world";
echo xdebug_debug_zval('a'); // refcount=1
$b = $a; // $b引用变量$a
echo xdebug_debug_zval('a'); // refcount=2
$c = $a; // $c引用变量$a
echo xdebug_debug_zval('a'); // refcount=3
unset($c); // 删除了$c的引用
echo xdebug_debug_zval('a'); // refcount=2

何为拷贝复制?

$a = 'hello';
$b = $a; // $a赋值给$b的时候,并没有复制值
echo xdebug_debug_zval('a'); // $a的引用计数为2
$a = 'world'; // 当修改$a的值的时候,不得己进行复制,避免牵扯到$b
echo xdebug_debug_zval('a'); // $a的引用计数为1

垃圾回收机制:

当一个zval在被unset时,或者从一个函数中运行完毕出来(局部变量)等很多地方,都会产生zval与zend_value发生断开的行为,这个时候zend引擎需要检测的就是zend_value的refcount是否为0,如果为0则直接KO free空出内容来。如果zend_value的refcount不为0,这个value就不能被释放,但是也不代表这个zend_value是清白的,因为此zend_value依然可能是垃圾。

  1. 当php变量的refcount=0时,变量$a就会被垃圾回收
  2. 当php变量的refcount>0时,变量$a也可能被认为是垃圾

什么情况会导致zend_value的refcount不为0,但是zend_value确是垃圾

$arr = [1];
$arr[] = &$arr;
unset($arr);

这种情况下,zend_value不会释放,但也不能放过它,不然会产生内存泄露,所以这会儿zend_value会被扔到一个叫做垃圾回收堆中,然后zend引擎会依次对垃圾回收堆中的这些zend_value进行二次检测,检测是不是由于上述两种情况造成的refcount为1但是自身却没有人在使用了,一旦确认是上述两种情况造成的,那么就会将zend_value彻底抹掉释放内存。

垃圾回收发生在什么时候

  1. FPM运行完毕后,一定会GC
  2. 运行过程中也会GC,内存都是即用即释放的

git reset --hard <commit_id>

git push origin HEAD --force

根据–soft –mixed –hard,会对working tree和index和HEAD进行重置:

git reset –mixed:不带任何参数的git reset,即时这种方式,它回退到某个版本,只保留源码,回退commit和index信息
git reset –soft:回退到某个版本,只回退了commit的信息,不会恢复到index file一级。如果还要提交,直接commit即可
git reset –hard:彻底回退到某个版本,本地的源码也会变为上一个版本的内容

Queue是一个先进先出的集合,在工作中经常会使用到。

但是难免会遇到这样一种情况:一个用户或某几个用户推了太多的任务在队列前排,其他新的用户可能只有很少一部分任务却得不到执行,在服务器资源有限、消费者执行速度有限的情况下,对后面的用户造成使用体验很差的情况(有的人推荐使用高低速队列分组,然而事实却并不能提供一个合理的阙值来区分任务组)。

例:用户 a,b,c,d,e 的任务以 A,B,C,D,E 来标记,他们各自推送了 8,9,4,5,2 条任务,正常的执行流程为(换行为更容易看清楚)

A A A A A A A A ->
B B B B B B B B B->
C C C C ->
D D D D D ->
E E

用户E虽然只有两条任务待处理,但是需要等待很久。

对此我们的解决方案是:修改队列执行顺序逻辑,以用户进行分组,在相同资源情况下,给所有用户一致和公平的体验。

A B C D E ->
A B C D E ->
A B C D ->
A B C D ->
A B D ->
A B ->
A B ->
A B ->
B

over,不想往后看的可以去拿代码了:
基于Yii2-Queue的Redis驱动修改


以下是源码分析

我们目前项目中使用的是Yii2-Queue,源码地址为https://github.com/yiisoft/yii2-queue,目前版本2.0.1。以下是对Yii2-Queue的Redis驱动执行流程分析及优化过程:

在程序中注册\yii\queue\redis\Queue,然后推送任务的调用方式:

Yii::$app->queue->push(new SomeJob());

跟踪得到push()方法继承于src/Queue.php中,建立任务调用所注册驱动程序的pushMessage($message, $ttr, $delay, $priority)方法,Redis驱动的代码在src/drivers/Queue.php中。

push源码注解:

    protected function pushMessage($message, $ttr, $delay, $priority)
    {
        // Redis驱动不支持作业优先级 如果有传值则抛出错误
        if ($priority !== null) {
            throw new NotSupportedException('Job priority is not supported in the driver.');
        }
        // 取到上条 message_id + 1 ,作为本条任务的ID
        $id = $this->redis->incr("$this->channel.message_id");
        // 以新ID将本条任务格存储到hash表 messages 中
        $this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
        if (!$delay) {
            // 如果不需要等待执行 则将任务ID推到hash表 waiting 中
            $this->redis->lpush("$this->channel.waiting", $id);
        } else {
            // 如果需要等待执行 则推送到有序集合 delayed 中
            $this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
        }
        // 返回任务ID
        return $id;
    }

reserve源码注解:

    /**
     * @param int $wait timeout
     * @return array|null payload
     */
    protected function reserve($wait)
    {
        // 将延迟和保留的作业移动到等待列表中 并锁定一秒
        if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
            $this->moveExpired("$this->channel.delayed");
            $this->moveExpired("$this->channel.reserved");
        }
        // Find a new waiting message
        $id = null;
        if (!$wait) {
            // 从等待列表取1条任务ID待执行
            $id = $this->redis->rpop("$this->channel.waiting");
        } elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
            $id = $result[1];
        }
        if (!$id) {
            return null;
        }
        // 根据任务ID取出任务
        $payload = $this->redis->hget("$this->channel.messages", $id);
        list($ttr, $message) = explode(';', $payload, 2);
        // 加入到作业列表
        $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
        $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);
        return [$id, $message, $ttr, $attempt];
    }

以上是关键的两段代码,经过修改逻辑如下:

    /**
     * @inheritdoc
     */
    protected function pushMessage($message, $ttr, $delay, $priority, $group)
    {
        if ($priority !== null) {
            throw new NotSupportedException('Job priority is not supported in the driver.');
        }

        $id = $this->redis->incr("$this->channel.message_id");
        $this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
        if (!$delay) {
            $group ?
                $this->redis->lpush("$this->channel.waiting.$group", $id) :
                $this->redis->lpush("$this->channel.waiting", $id);
        } else {
            $this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
        }

        return $id;
    }
    /**
     * @param int $wait timeout
     * @return array|null payload
     */
    protected function reserve($wait)
    {
        // Moves delayed and reserved jobs into waiting list with lock for one second
        if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
            $this->moveExpired("$this->channel.delayed");
            $this->moveExpired("$this->channel.reserved");
        }

        // 如果列表为空  则遍历所有的子队列 每个拿一条数据放到列表
        if(!$this->redis->llen("$this->channel.waiting")){
            $keys = $this->redis->keys("$this->channel.waiting.*");
            foreach ($keys as $key){
                $id = $this->redis->rpop($key);
                $this->redis->lpush("$this->channel.waiting", $id);
            }
        }

        // Find a new waiting message
        $id = null;
        if (!$wait) {
            $id = $this->redis->rpop("$this->channel.waiting");
        } elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
            $id = $result[1];
        }
        if (!$id) {
            return null;
        }

        $payload = $this->redis->hget("$this->channel.messages", $id);
        list($ttr, $message) = explode(';', $payload, 2);
        $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
        $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);

        return [$id, $message, $ttr, $attempt];
    }