www.adminn.cn
站长正能量分享网!

PHP使用Beanstalkd实例详解

AD:阿里云服务器企业会员更优惠 腾讯云香港,韩国免备案服务器1.8折优惠

有关Beanstalkd的基本概念,编译和yum的安装方法已经在上篇文章《Beanstalkd消息/任务队列的详解》中介绍了,今天练习下PHP使用Beanstalkd的过程,我选择的是使用Pheanstalk类来连接Beanstalkd

1.使用Composer安装Pheanstalk

composer require pda/pheanstalk

2.实现代码

php查看beanstalkd状态脚本Status.php

  1. <?php 
  2.  
  3. /** 
  4.  
  5.  * Created by PhpStorm. 
  6.  
  7.  * User: jmsite.cn 
  8.  
  9.  * Date: 2019/1/21 
  10.  
  11.  * Time: 10:32 
  12.  
  13.  */ 
  14.  
  15. require "../vendor/autoload.php"
  16.  
  17. use Pheanstalk\Pheanstalk; 
  18.  
  19. $pheanstalk = new Pheanstalk('192.168.75.135',11300); 
  20.  
  21. print_r($pheanstalk->stats()); 

生产者代码Producter.php

  1. <?php 
  2.  
  3. /** 
  4.  
  5.  * Created by PhpStorm. 
  6.  
  7.  * User: jmsite.cn 
  8.  
  9.  * Date: 2019/1/20 
  10.  
  11.  * Time: 16:30 
  12.  
  13.  */ 
  14.  
  15. require "../vendor/autoload.php"
  16.  
  17. use Pheanstalk\Pheanstalk; 
  18.  
  19. $pheanstalk = new Pheanstalk('192.168.75.135',11300); 
  20.  
  21. for ($i=0;$i<50;$i++){ 
  22.  
  23.     $data = array
  24.  
  25.         'key' => 'testkey'.$i
  26.  
  27.         'value' => 'testvalue'
  28.  
  29.         'time' => time(), 
  30. //phpfensi.com 
  31.     ); 
  32.  
  33.     $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR); 
  34.  
  35.     var_dump($ret); 
  36.  

消费者代码Consumer.php

  1. <?php 
  2.  
  3. /** 
  4.  
  5.  * Created by PhpStorm. 
  6.  
  7.  * User: jmsite.cn 
  8.  
  9.  * Date: 2019/1/20 
  10.  
  11.  * Time: 16:31 
  12.  
  13.  */ 
  14.  
  15. set_time_limit(0); 
  16.  
  17. ini_set('default_socket_timeout', 900); 
  18.  
  19. require "../vendor/autoload.php"
  20.  
  21. use Pheanstalk\Pheanstalk; 
  22.  
  23. $pheanstalk = new Pheanstalk('192.168.75.135',11300); 
  24.  
  25. while (true){ 
  26.  
  27.     $job = $pheanstalk 
  28.  
  29.         ->watch('test-tube'
  30.  
  31.         ->ignore('default'
  32.  
  33.         ->reserve(); 
  34.  
  35.     if ($job){ 
  36.  
  37.         sleep(2); 
  38.  
  39.         echo $job->getData(); 
  40.  
  41.         echo "\n"
  42.  
  43.         $pheanstalk->delete($job); 
  44.  
  45.     } 
  46.  

打开命令行/终端窗口,执行生产者,会向tube写入50条任务

  1. PS E:\repository\work\beanstalk> php .\Producter.php 
  2.  
  3. int(101) 
  4.  
  5. int(102) 
  6.  
  7. int(103) 
  8.  
  9. int(104) 
  10.  
  11. int(105) 
  12.  
  13. int(106) 
  14.  
  15. int(107) 
  16.  
  17. int(108) 
  18.  
  19. int(109) 
  20.  
  21. int(110) 
  22.  
  23. int(111) 
  24.  
  25. int(112) 
  26.  
  27. int(113) 
  28.  
  29. int(114) 
  30.  
  31. …… 

由此可见,$pheanstalk->putInTube成功后返回的是job的id

查看状态

  1. PS E:\repository\work\beanstalk> php Status.php 
  2.  
  3. Pheanstalk\Response\ArrayResponse Object 
  4.  
  5.  
  6.     [_name:Pheanstalk\Response\ArrayResponse:private] => OK 
  7.  
  8.     [storage:ArrayObject:private] => Array 
  9.  
  10.         ( 
  11.  
  12.             [current-jobs-urgent] => 0 
  13.  
  14.             [current-jobs-ready] => 50 
  15.  
  16.             [current-jobs-reserved] => 0 
  17.  
  18.             [current-jobs-delayed] => 0 
  19.  
  20.             [current-jobs-buried] => 0 
  21.  
  22.             …… 

结果中显示处于ready待读取状态的job是50个

打开两个或以上命令行/终端窗口,执行消费者,模拟多消费者竞争

消费者1

  1. PS E:\repository\work\beanstalk> php .\Consumer.php 
  2.  
  3. {"key":"testkey0","value":"testvalue","time":1548039103} 
  4.  
  5. {"key":"testkey1","value":"testvalue","time":1548039103} 
  6.  
  7. {"key":"testkey2","value":"testvalue","time":1548039103} 
  8.  
  9. {"key":"testkey4","value":"testvalue","time":1548039103} 
  10.  
  11. {"key":"testkey6","value":"testvalue","time":1548039103} 
  12.  
  13. {"key":"testkey8","value":"testvalue","time":1548039103} 
  14.  
  15. {"key":"testkey10","value":"testvalue","time":1548039103} 
  16.  
  17. {"key":"testkey12","value":"testvalue","time":1548039103} 
  18.  
  19. {"key":"testkey14","value":"testvalue","time":1548039103} 
  20.  
  21. {"key":"testkey16","value":"testvalue","time":1548039103} 
  22.  
  23. {"key":"testkey18","value":"testvalue","time":1548039103} 
  24.  
  25. {"key":"testkey20","value":"testvalue","time":1548039103} 
  26.  
  27. {"key":"testkey22","value":"testvalue","time":1548039103} 
  28.  
  29. {"key":"testkey24","value":"testvalue","time":1548039103} 
  30.  
  31. {"key":"testkey26","value":"testvalue","time":1548039103} 
  32.  
  33. {"key":"testkey28","value":"testvalue","time":1548039103} 
  34.  
  35. {"key":"testkey30","value":"testvalue","time":1548039103} 
  36.  
  37. {"key":"testkey32","value":"testvalue","time":1548039103} 
  38.  
  39. {"key":"testkey34","value":"testvalue","time":1548039103} 
  40.  
  41. {"key":"testkey36","value":"testvalue","time":1548039103} 
  42.  
  43. {"key":"testkey38","value":"testvalue","time":1548039103} 
  44.  
  45. {"key":"testkey40","value":"testvalue","time":1548039103} 
  46.  
  47. {"key":"testkey42","value":"testvalue","time":1548039103} 
  48.  
  49. {"key":"testkey44","value":"testvalue","time":1548039103} 
  50.  
  51. {"key":"testkey46","value":"testvalue","time":1548039103} 
  52.  
  53. {"key":"testkey48","value":"testvalue","time":1548039103} 

消费者2

  1. PS E:\repository\work\beanstalk> php .\Consumer.php 
  2.  
  3. {"key":"testkey3","value":"testvalue","time":1548039103} 
  4.  
  5. {"key":"testkey5","value":"testvalue","time":1548039103} 
  6.  
  7. {"key":"testkey7","value":"testvalue","time":1548039103} 
  8.  
  9. {"key":"testkey9","value":"testvalue","time":1548039103} 
  10.  
  11. {"key":"testkey11","value":"testvalue","time":1548039103} 
  12.  
  13. {"key":"testkey13","value":"testvalue","time":1548039103} 
  14.  
  15. {"key":"testkey15","value":"testvalue","time":1548039103} 
  16.  
  17. {"key":"testkey17","value":"testvalue","time":1548039103} 
  18.  
  19. {"key":"testkey19","value":"testvalue","time":1548039103} 
  20.  
  21. {"key":"testkey21","value":"testvalue","time":1548039103} 
  22.  
  23. {"key":"testkey23","value":"testvalue","time":1548039103} 
  24.  
  25. {"key":"testkey25","value":"testvalue","time":1548039103} 
  26.  
  27. {"key":"testkey27","value":"testvalue","time":1548039103} 
  28.  
  29. {"key":"testkey29","value":"testvalue","time":1548039103} 
  30.  
  31. {"key":"testkey31","value":"testvalue","time":1548039103} 
  32.  
  33. {"key":"testkey33","value":"testvalue","time":1548039103} 
  34.  
  35. {"key":"testkey35","value":"testvalue","time":1548039103} 
  36.  
  37. {"key":"testkey37","value":"testvalue","time":1548039103} 
  38.  
  39. {"key":"testkey39","value":"testvalue","time":1548039103} 
  40.  
  41. {"key":"testkey41","value":"testvalue","time":1548039103} 
  42.  
  43. {"key":"testkey43","value":"testvalue","time":1548039103} 
  44.  
  45. {"key":"testkey45","value":"testvalue","time":1548039103} 
  46.  
  47. {"key":"testkey47","value":"testvalue","time":1548039103} 
  48.  
  49. {"key":"testkey49","value":"testvalue","time":1548039103} 

两个消费者竞争着完成了全部任务,由于我的beanstalkd启动时开启了binlog持久,所以beanstalkd重启后任务也不会丢失

3.需要注意的事项

1.创建job时,设置的超时时间Pheanstalk::DEFAULT_TTR一定要比消费者处理一个job的时间要长,否则job在超时之后会被tube更改为ready状态,被其他消费者获取,而此时当前消费者还在处理该job,这就出现了一个job被多个消费者重复执行的可怕现象

2.Pheanstalk的维护者发生了变化,在新版的Pheanstalk中是不支持长连接的,当客户端socket连接服务器时间超过php.ini中设置的default_socket_timeout时,如果未能从服务端tube获得job,连接将会被断开,所以消费者进程需要维护,以便在退出后可以重新开启进程,推荐使用supervisord维护消费者进程。

判断socket超时的代码

  1. public function getLine($length = null) 
  2.  
  3.     { 
  4.  
  5.         $timeout = ini_get('default_socket_timeout'); 
  6.  
  7.         $timer   = microtime(true); 
  8.  
  9.         do { 
  10.  
  11.             $data = isset($length) ? 
  12.  
  13.                 $this->_wrapper()->fgets($this->_socket, $length) : 
  14.  
  15.                 $this->_wrapper()->fgets($this->_socket); 
  16.  
  17.             if ($this->_wrapper()->feof($this->_socket)) { 
  18.  
  19.                 throw new Exception\SocketException('Socket closed by server!'); 
  20.  
  21.             } 
  22.  
  23.             if (($data === false) && microtime(true) – $timer > $timeout) { 
  24.  
  25.                 $this->disconnect(); 
  26.  
  27.                 throw new Exception\SocketException('Socket timed out!'); 
  28.  
  29.             } 
  30.  
  31.         } while ($data === false); 
  32.  
  33.         return rtrim($data); 
  34.  
  35.     } 

模板优惠价: (点击购买)
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《PHP使用Beanstalkd实例详解》
文章链接:https://www.adminn.cn/news/8426.html
本站资源模板仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。2021.5月起,网站调整,暂不再分享免费模板。谢谢理解

扫码支付后请联系右侧QQ发送下载地址!!

源码请勿用于任何涉灰站点!净化网络,站长更有责!

支付宝扫一扫打赏