thinkphp6 消息队列
安装
composer require topthink/think-queue
配置
配置文件位于 config/queue.php
使用redis作为消息队列
<?php
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',//默认队列名称
'host' => '127.0.0.1',//Redis主机IP地址
'port' => 6379,//Redis端口
'password' => '',//Redis密码
'select' => 1,//Redis数据库
'timeout' => 0,//Redis连接超时时间
'persistent' => false,//是否长连接
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
创建消费类
在app目录下创建目录job,创建类文件
<?php
namespace app\job;
use think\facade\Log;
use think\queue\Job;
class Test
{
public function fire(Job $job, $data)
{
// 处理业务逻辑返回为true表示消费成功,则删除队列
if($this->test($job->attempts())){
// 删除队列
$job->delete();
}else{
// 判断执行失败次数,到达设置值后删除消息队列
if ($job->attempts() >= 10) {
Log::channel('qxsp')->info('到达规定次数删除了');
// 删除队列
$job->delete();
}else{
Log::channel('qxsp')->info('继续执行');
// 重庆消息队列,重要:如果没有这样设置,默认的是1失败后1分钟执行一次,这样设置的话达到失败后隔多久执行下一次。官方的坑研究了好久。
$job->release(120);
}
}
}
// 处理业务逻辑
public function test($data)
{
Log::channel('qxsp')->info($data);
return false;
}
}
创建任务类
Queue::push($job, $data = '', $queue = null) 和Queue::later($delay, $job, $data = '', $queue = null) 两个方法,前者是立即执行,后者是在$delay秒后执行
$job 是任务名
命名空间是app\job的,比如上面的例子一,写Job1类名即可
其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1、app\lib\job\Job2@task2
$data 是你要传到任务里的参数
$queue 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填
以下为上面消费类例子
public function index()
{
Queue::push('Test', date("h:i:sa"), 'wu');
}
监听任务并执行
在根目录下执行
使用该语句:修改消费类代码可以实时更新无需重启
php think queue:listen
使用该语句:修改消费类代码不会实时更新,需要重启才能生效
php think queue:work
https://www.kancloud.cn/w13244855188/think-queue-wu