各类知识收集,PHP技术分享与解决方案各类知识收集,PHP技术分享与解决方案各类知识收集,PHP技术分享与解决方案

Str Tom,为分享PHP技术和解决方案,贡献一份自己的力量!
收藏本站(不迷路),每天更新好文章!
当前位置:首页 > CMS教程 > PHP

PHP使用Beanstalkd实例详解

管理员 2023-09-05
PHP
122

PHP使用Beanstalkd实例详解

内容导读

收集整理的这篇技术教程文章主要介绍了PHP使用Beanstalkd实例详解,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含6376字,纯文字阅读大概需要10分钟

内容图文

有关Beanstalkd的基本概念,编译和yum的安装方法已经在上篇文章《Beanstalkd消息/任务队列的详解》中介绍了,今天练习下PHP使用Beanstalkd的过程,我选择的是使用Pheanstalk类来连接Beanstalkd

1.使用Composer安装Pheanstalk

composer require pda/pheanstalk

2.实现代码

php查看beanstalkd状态脚本Status.php

<?php/** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/21 * Time: 10:32 */require "../vendor/autoload.php";use PheanstalkPheanstalk;$pheanstalk = new Pheanstalk('192.168.75.135',11300);print_r($pheanstalk->stats());

生产者代码Producter.php

<?php/** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/20 * Time: 16:30 */require "../vendor/autoload.php";use PheanstalkPheanstalk;$pheanstalk = new Pheanstalk('192.168.75.135',11300);for ($i=0;$i<50;$i++){    $data = array(        'key' => 'testkey'.$i,        'value' => 'testvalue',        'time' => time(),    );    $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);    var_dump($ret);}

消费者代码Consumer.php

<?php/** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/20 * Time: 16:31 */set_time_limit(0);ini_set('default_socket_timeout', 900);require "../vendor/autoload.php";use PheanstalkPheanstalk;$pheanstalk = new Pheanstalk('192.168.75.135',11300);while (true){    $job = $pheanstalk        ->watch('test-tube')        ->ignore('default')        ->reserve();    if ($job){        sleep(2);        echo $job->getData();        echo "n";        $pheanstalk->delete($job);    }}

打开命令行/终端窗口,执行生产者,会向tube写入50条任务

PS E:repositoryworkbeanstalk> php .Producter.phpint(101)int(102)int(103)int(104)int(105)int(106)int(107)int(108)int(109)int(110)int(111)int(112)int(113)int(114)......

由此可见,$pheanstalk->putInTube成功后返回的是job的id

查看状态

PS E:repositoryworkbeanstalk> php Status.phpPheanstalkResponseArrayResponse Object(    [_name:PheanstalkResponseArrayResponse:private] => OK    [storage:ArrayObject:private] => Array        (            [current-jobs-urgent] => 0            [current-jobs-ready] => 50            [current-jobs-reserved] => 0            [current-jobs-delayed] => 0            [current-jobs-buried] => 0            ......

结果中显示处于ready待读取状态的job是50个

打开两个或以上命令行/终端窗口,执行消费者,模拟多消费者竞争

消费者1

PS E:repositoryworkbeanstalk> php .Consumer.php{"key":"testkey0","value":"testvalue","time":1548039103}{"key":"testkey1","value":"testvalue","time":1548039103}{"key":"testkey2","value":"testvalue","time":1548039103}{"key":"testkey4","value":"testvalue","time":1548039103}{"key":"testkey6","value":"testvalue","time":1548039103}{"key":"testkey8","value":"testvalue","time":1548039103}{"key":"testkey10","value":"testvalue","time":1548039103}{"key":"testkey12","value":"testvalue","time":1548039103}{"key":"testkey14","value":"testvalue","time":1548039103}{"key":"testkey16","value":"testvalue","time":1548039103}{"key":"testkey18","value":"testvalue","time":1548039103}{"key":"testkey20","value":"testvalue","time":1548039103}{"key":"testkey22","value":"testvalue","time":1548039103}{"key":"testkey24","value":"testvalue","time":1548039103}{"key":"testkey26","value":"testvalue","time":1548039103}{"key":"testkey28","value":"testvalue","time":1548039103}{"key":"testkey30","value":"testvalue","time":1548039103}{"key":"testkey32","value":"testvalue","time":1548039103}{"key":"testkey34","value":"testvalue","time":1548039103}{"key":"testkey36","value":"testvalue","time":1548039103}{"key":"testkey38","value":"testvalue","time":1548039103}{"key":"testkey40","value":"testvalue","time":1548039103}{"key":"testkey42","value":"testvalue","time":1548039103}{"key":"testkey44","value":"testvalue","time":1548039103}{"key":"testkey46","value":"testvalue","time":1548039103}{"key":"testkey48","value":"testvalue","time":1548039103}

消费者2

PS E:repositoryworkbeanstalk> php .Consumer.php{"key":"testkey3","value":"testvalue","time":1548039103}{"key":"testkey5","value":"testvalue","time":1548039103}{"key":"testkey7","value":"testvalue","time":1548039103}{"key":"testkey9","value":"testvalue","time":1548039103}{"key":"testkey11","value":"testvalue","time":1548039103}{"key":"testkey13","value":"testvalue","time":1548039103}{"key":"testkey15","value":"testvalue","time":1548039103}{"key":"testkey17","value":"testvalue","time":1548039103}{"key":"testkey19","value":"testvalue","time":1548039103}{"key":"testkey21","value":"testvalue","time":1548039103}{"key":"testkey23","value":"testvalue","time":1548039103}{"key":"testkey25","value":"testvalue","time":1548039103}{"key":"testkey27","value":"testvalue","time":1548039103}{"key":"testkey29","value":"testvalue","time":1548039103}{"key":"testkey31","value":"testvalue","time":1548039103}{"key":"testkey33","value":"testvalue","time":1548039103}{"key":"testkey35","value":"testvalue","time":1548039103}{"key":"testkey37","value":"testvalue","time":1548039103}{"key":"testkey39","value":"testvalue","time":1548039103}{"key":"testkey41","value":"testvalue","time":1548039103}{"key":"testkey43","value":"testvalue","time":1548039103}{"key":"testkey45","value":"testvalue","time":1548039103}{"key":"testkey47","value":"testvalue","time":1548039103}{"key":"testkey49","value":"testvalue","time":1548039103}

两个消费者竞争着完成了全部任务,由于我的beanstalkd启动时开启了binlog持久,所以beanstalkd重启后任务也不会丢失

3.需要注意的事项

1.创建job时,设置的超时时间Pheanstalk::DEFAULT_TTR一定要比消费者处理一个job的时间要长,否则job在超时之后会被tube更改为ready状态,被其他消费者获取,而此时当前消费者还在处理该job,这就出现了一个job被多个消费者重复执行的可怕现象

2.Pheanstalk的维护者发生了变化,在新版的Pheanstalk中是不支持长连接的,当客户端socket连接服务器时间超过php.ini中设置的default_socket_timeout时,如果未能从服务端tube获得job,连接将会被断开,所以消费者进程需要维护,以便在退出后可以重新开启进程,推荐使用supervisord维护消费者进程。

判断socket超时的代码

public function getLine($length = null)    {        $timeout = ini_get('default_socket_timeout');        $timer   = microtime(true);        do {            $data = isset($length) ?                $this->_wrapper()->fgets($this->_socket, $length) :                $this->_wrapper()->fgets($this->_socket);            if ($this->_wrapper()->feof($this->_socket)) {                throw new ExceptionSocketException('Socket closed by server!');            }            if (($data === false) && microtime(true) - $timer > $timeout) {                $this->disconnect();                throw new ExceptionSocketException('Socket timed out!');            }        } while ($data === false);        return rtrim($data);    }

以上就是PHP使用Beanstalkd实例详解的详细内容,更多请关注Gxl网其它相关文章!

内容总结

以上是为您收集整理的PHP使用Beanstalkd实例详解全部内容,希望文章能够帮你解决PHP使用Beanstalkd实例详解所遇到的程序开发问题。 如果觉得技术教程内容还不错,欢迎将网站推荐给程序员好友。

内容备注

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

相关推荐

扫码关注

qrcode

QQ交谈

回顶部