Author Ljz
日期 2020-12-30 09:49:05
原文 http://note.youdao.com/noteshare?id=6ada5364f23c8c515760e18e65ba964d&sub=80BA7E15B78C4C1484B68420B7EA034D

概述
消息队列 (Message Queue) 常用于解决并发系统中的资源一致性问题,提升系统的峰值的处理能力,同时保证消息的顺序性、可恢复性、必送达性,对应用进行解耦,或者实现异步通讯等。市面上的 MQ,即消息中间件应用有很多(例如:Kafka,RabbitMQ,Disque),也可以基于 Redis 来实现。

  1. 使用下载composer Redis队列包,不添加版本号则自动默认官方下载版本
 composer require topthink/think-queue 2.0.4
  1. 也可以到composer packagist官网下载对应的版本或者消息队列

  2. 搭建消息队列的存储环境,在config配置目录下的cache文件配置并且使用Redis服务,启动Redis服务后可以与Redis Desktop Manager结合使用,具体安装步骤不细说

<?php

return [
    'type'   => 'complex',
    'default'   =>  [
        'type'    =>  'file',
        'expire'=>  0, 
        'prefix'=>  'think',
        'path'  =>  '../runtime/cache/',
    ],
    'redis' =>  [
        'type'    =>  'redis',
        'host'    =>  '127.0.0.1',
        'port' => '6379',
        'password' => '123456',
        'select'     => 0,
        'expire'=>  0, 
        'prefix'=>  'tp_',
    ] 
];
  1. 配置消息队列的驱动,根据选择的存储方式, 在config配置目录下, 新建新的queue配置文件, 用了添加消息队列对应的驱动配置
<?php

return [
    'connector'  => 'Redis',          // 可选驱动类型:sync(默认)、Redis、database、topthink等其他自定义类型
    'expire'     => 0,              // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',        // 默认的队列名称
    'host'       => '47.112.147.225',      // redis 主机ip
    'port'       => 6379,            // redis 端口
    'password'   => 'adasdQW231qwe',// redis 连接密码
    'select'     => 7,              // 使用哪一个 db,默认为 db0
    'timeout'    => 0,              // redis连接的超时时间
    'persistent' => false,           // 是否是长连接
];
  1. 创建消息并且推送消息, 首页我们在可以在common公共类文件中创建一个新的方法, 作为为某个消息队列公用的创建方法并且推送到该消息队列中处理的封装好的公用方法,推送到 helloJobQueue 队列
/** * [delayedQueueReplacementMsg 消息创建推送] * @param [type] $delay [延迟时间] * @param [type] $jobData [队列数量] * @return [type] $isPushed [加入状态] */
function delayedQueueMonitorMsg($delay,$jobData){ 
    // 1.当前任务将由哪个类来负责处理。
    $jobHandlerClassName  = 'app\index\controller\helloJobQueue';
    // 2.当前任务归属的队列名称,如果为新队列,会自动创建
    $jobQueueName     = "helloJobQueue";
    // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
    // ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
    //延时队列在`$delay`秒后执行
    $isPushed = Queue::later($delay, $jobHandlerClassName , $jobData, $jobQueueName);
    // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
    return $isPushed;
}
  1. 消息进行消费, 编写Msg消费者类,用于处理 helloJobQueue 队列中的任务, 新增\app\index\contoller\helloJobQueue.php 消费者类,并编写其fire()方法
/** * @param job $job 当前的任务对象 * @param $data 发布任务时自定义的数据->jobData */
public function fire( job $job,$data ) { 
    // 有些消息在到达消费者时,可能已经不再需要执行了
    $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);

    // 判断条件是否满足
    if($isJobStillNeedToBeDone){ 
        $job->delete();
        return;
    }

    $isJobDone = $this->doHelloJob($data);

    if ($isJobDone['type'] == 'success') { 
        // 如果任务执行成功,删除任务
        echo 'success';
        $job->delete();
    }else if ($isJobDone['type'] == 'release'){ 
        // 也可以重新发布这个任务
        //$delay为延迟时间,表示该任务延迟$isJobDone['delay']秒后再执行
        $job->release($isJobDone['delay']);
    }else{ 
        // 如果任务执行失败,删除任务
        echo 'fail';
        $job->delete();
    }
}
  1. doHelloJob方法作为业务处理逻辑的方法
/** * 根据消息中的数据进行实际的业务处理... */
private function doHelloJob($data){ 
    // 主要的业务逻辑
}
  1. failed方法作为发送失败后写入系统日志中
public function failed($data){ 
    //打印日志
}
  1. 处理任务切换当前终端窗口的目录到项目根目录下,执行php think queue:work –queue helloJobQueue

注意: 启动或者停止服务的时候,必须要查看任务是否已经停止

总结:消息队列来处理解耦,异步消息,流量削峰填谷是非常有效的,也非常的实用

本文地址:https://blog.csdn.net/qq_44304677/article/details/111952858