Queue是一个先进先出的集合,在工作中经常会使用到。

但是难免会遇到这样一种情况:一个用户或某几个用户推了太多的任务在队列前排,其他新的用户可能只有很少一部分任务却得不到执行,在服务器资源有限、消费者执行速度有限的情况下,对后面的用户造成使用体验很差的情况(有的人推荐使用高低速队列分组,然而事实却并不能提供一个合理的阙值来区分任务组)。

例:用户 a,b,c,d,e 的任务以 A,B,C,D,E 来标记,他们各自推送了 8,9,4,5,2 条任务,正常的执行流程为(换行为更容易看清楚)

A A A A A A A A ->
B B B B B B B B B->
C C C C ->
D D D D D ->
E E

用户E虽然只有两条任务待处理,但是需要等待很久。

对此我们的解决方案是:修改队列执行顺序逻辑,以用户进行分组,在相同资源情况下,给所有用户一致和公平的体验。

A B C D E ->
A B C D E ->
A B C D ->
A B C D ->
A B D ->
A B ->
A B ->
A B ->
B

over,不想往后看的可以去拿代码了:
基于Yii2-Queue的Redis驱动修改


以下是源码分析

我们目前项目中使用的是Yii2-Queue,源码地址为https://github.com/yiisoft/yii2-queue,目前版本2.0.1。以下是对Yii2-Queue的Redis驱动执行流程分析及优化过程:

在程序中注册\yii\queue\redis\Queue,然后推送任务的调用方式:

Yii::$app->queue->push(new SomeJob());

跟踪得到push()方法继承于src/Queue.php中,建立任务调用所注册驱动程序的pushMessage($message, $ttr, $delay, $priority)方法,Redis驱动的代码在src/drivers/Queue.php中。

push源码注解:

    protected function pushMessage($message, $ttr, $delay, $priority)
    {
        // Redis驱动不支持作业优先级 如果有传值则抛出错误
        if ($priority !== null) {
            throw new NotSupportedException('Job priority is not supported in the driver.');
        }
        // 取到上条 message_id + 1 ,作为本条任务的ID
        $id = $this->redis->incr("$this->channel.message_id");
        // 以新ID将本条任务格存储到hash表 messages 中
        $this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
        if (!$delay) {
            // 如果不需要等待执行 则将任务ID推到hash表 waiting 中
            $this->redis->lpush("$this->channel.waiting", $id);
        } else {
            // 如果需要等待执行 则推送到有序集合 delayed 中
            $this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
        }
        // 返回任务ID
        return $id;
    }

reserve源码注解:

    /**
     * @param int $wait timeout
     * @return array|null payload
     */
    protected function reserve($wait)
    {
        // 将延迟和保留的作业移动到等待列表中 并锁定一秒
        if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
            $this->moveExpired("$this->channel.delayed");
            $this->moveExpired("$this->channel.reserved");
        }
        // Find a new waiting message
        $id = null;
        if (!$wait) {
            // 从等待列表取1条任务ID待执行
            $id = $this->redis->rpop("$this->channel.waiting");
        } elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
            $id = $result[1];
        }
        if (!$id) {
            return null;
        }
        // 根据任务ID取出任务
        $payload = $this->redis->hget("$this->channel.messages", $id);
        list($ttr, $message) = explode(';', $payload, 2);
        // 加入到作业列表
        $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
        $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);
        return [$id, $message, $ttr, $attempt];
    }

以上是关键的两段代码,经过修改逻辑如下:

    /**
     * @inheritdoc
     */
    protected function pushMessage($message, $ttr, $delay, $priority, $group)
    {
        if ($priority !== null) {
            throw new NotSupportedException('Job priority is not supported in the driver.');
        }

        $id = $this->redis->incr("$this->channel.message_id");
        $this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
        if (!$delay) {
            $group ?
                $this->redis->lpush("$this->channel.waiting.$group", $id) :
                $this->redis->lpush("$this->channel.waiting", $id);
        } else {
            $this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
        }

        return $id;
    }
    /**
     * @param int $wait timeout
     * @return array|null payload
     */
    protected function reserve($wait)
    {
        // Moves delayed and reserved jobs into waiting list with lock for one second
        if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
            $this->moveExpired("$this->channel.delayed");
            $this->moveExpired("$this->channel.reserved");
        }

        // 如果列表为空  则遍历所有的子队列 每个拿一条数据放到列表
        if(!$this->redis->llen("$this->channel.waiting")){
            $keys = $this->redis->keys("$this->channel.waiting.*");
            foreach ($keys as $key){
                $id = $this->redis->rpop($key);
                $this->redis->lpush("$this->channel.waiting", $id);
            }
        }

        // Find a new waiting message
        $id = null;
        if (!$wait) {
            $id = $this->redis->rpop("$this->channel.waiting");
        } elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
            $id = $result[1];
        }
        if (!$id) {
            return null;
        }

        $payload = $this->redis->hget("$this->channel.messages", $id);
        list($ttr, $message) = explode(';', $payload, 2);
        $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
        $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);

        return [$id, $message, $ttr, $attempt];
    }