有關Beanstalkd的基本概念,編譯和yum的安裝方法已經在上篇文章《Beanstalkd消息/任務隊列的詳解》中介紹了,今天練習下PHP使用Beanstalkd的過程,我選擇的是使用Pheanstalk類來連接Beanstalkd
1.使用Composer安裝Pheanstalk
composer require pda/pheanstalk
2.實現代碼
php查看beanstalkd狀態腳本Status.php
- <?php
- /**
- * Created by PhpStorm.
- * User: jmsite.cn
- * Date: 2019/1/21
- * Time: 10:32
- */
- require "../vendor/autoload.php";
- use Pheanstalk/Pheanstalk;
- $pheanstalk = new Pheanstalk('192.168.75.135',11300);
- print_r($pheanstalk->stats());
生產者代碼Producter.php
- <?php
- /**
- * Created by PhpStorm.
- * User: jmsite.cn
- * Date: 2019/1/20
- * Time: 16:30
- */
- require "../vendor/autoload.php";
- use Pheanstalk/Pheanstalk;
- $pheanstalk = new Pheanstalk('192.168.75.135',11300);
- for ($i=0;$i<50;$i++){
- $data = array(
- 'key' => 'testkey'.$i,
- 'value' => 'testvalue',
- 'time' => time(),
- //Vevb.com
- );
- $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);
- var_dump($ret);
- }
消費者代碼Consumer.php
- <?php
- /**
- * Created by PhpStorm.
- * User: jmsite.cn
- * Date: 2019/1/20
- * Time: 16:31
- */
- set_time_limit(0);
- ini_set('default_socket_timeout', 900);
- require "../vendor/autoload.php";
- use Pheanstalk/Pheanstalk;
- $pheanstalk = new Pheanstalk('192.168.75.135',11300);
- while (true){
- $job = $pheanstalk
- ->watch('test-tube')
- ->ignore('default')
- ->reserve();
- if ($job){
- sleep(2);
- echo $job->getData();
- echo "/n";
- $pheanstalk->delete($job);
- }
- }
打開命令行/終端窗口,執行生產者,會向tube寫入50條任務
- PS E:/repository/work/beanstalk> php ./Producter.php
- int(101)
- int(102)
- int(103)
- int(104)
- int(105)
- int(106)
- int(107)
- int(108)
- int(109)
- int(110)
- int(111)
- int(112)
- int(113)
- int(114)
- ......
由此可見,$pheanstalk->putInTube成功后返回的是job的id
查看狀態
- PS E:/repository/work/beanstalk> php Status.php
- Pheanstalk/Response/ArrayResponse Object
- (
- [_name:Pheanstalk/Response/ArrayResponse:private] => OK
- [storage:ArrayObject:private] => Array
- (
- [current-jobs-urgent] => 0
- [current-jobs-ready] => 50
- [current-jobs-reserved] => 0
- [current-jobs-delayed] => 0
- [current-jobs-buried] => 0
- ......
結果中顯示處于ready待讀取狀態的job是50個
打開兩個或以上命令行/終端窗口,執行消費者,模擬多消費者競爭
消費者1
- PS E:/repository/work/beanstalk> php ./Consumer.php
- {"key":"testkey0","value":"testvalue","time":1548039103}
- {"key":"testkey1","value":"testvalue","time":1548039103}
- {"key":"testkey2","value":"testvalue","time":1548039103}
- {"key":"testkey4","value":"testvalue","time":1548039103}
- {"key":"testkey6","value":"testvalue","time":1548039103}
- {"key":"testkey8","value":"testvalue","time":1548039103}
- {"key":"testkey10","value":"testvalue","time":1548039103}
- {"key":"testkey12","value":"testvalue","time":1548039103}
- {"key":"testkey14","value":"testvalue","time":1548039103}
- {"key":"testkey16","value":"testvalue","time":1548039103}
- {"key":"testkey18","value":"testvalue","time":1548039103}
- {"key":"testkey20","value":"testvalue","time":1548039103}
- {"key":"testkey22","value":"testvalue","time":1548039103}
- {"key":"testkey24","value":"testvalue","time":1548039103}
- {"key":"testkey26","value":"testvalue","time":1548039103}
- {"key":"testkey28","value":"testvalue","time":1548039103}
- {"key":"testkey30","value":"testvalue","time":1548039103}
- {"key":"testkey32","value":"testvalue","time":1548039103}
- {"key":"testkey34","value":"testvalue","time":1548039103}
- {"key":"testkey36","value":"testvalue","time":1548039103}
- {"key":"testkey38","value":"testvalue","time":1548039103}
- {"key":"testkey40","value":"testvalue","time":1548039103}
- {"key":"testkey42","value":"testvalue","time":1548039103}
- {"key":"testkey44","value":"testvalue","time":1548039103}
- {"key":"testkey46","value":"testvalue","time":1548039103}
- {"key":"testkey48","value":"testvalue","time":1548039103}
消費者2
- PS E:/repository/work/beanstalk> php ./Consumer.php
- {"key":"testkey3","value":"testvalue","time":1548039103}
- {"key":"testkey5","value":"testvalue","time":1548039103}
- {"key":"testkey7","value":"testvalue","time":1548039103}
- {"key":"testkey9","value":"testvalue","time":1548039103}
- {"key":"testkey11","value":"testvalue","time":1548039103}
- {"key":"testkey13","value":"testvalue","time":1548039103}
- {"key":"testkey15","value":"testvalue","time":1548039103}
- {"key":"testkey17","value":"testvalue","time":1548039103}
- {"key":"testkey19","value":"testvalue","time":1548039103}
- {"key":"testkey21","value":"testvalue","time":1548039103}
- {"key":"testkey23","value":"testvalue","time":1548039103}
- {"key":"testkey25","value":"testvalue","time":1548039103}
- {"key":"testkey27","value":"testvalue","time":1548039103}
- {"key":"testkey29","value":"testvalue","time":1548039103}
- {"key":"testkey31","value":"testvalue","time":1548039103}
- {"key":"testkey33","value":"testvalue","time":1548039103}
- {"key":"testkey35","value":"testvalue","time":1548039103}
- {"key":"testkey37","value":"testvalue","time":1548039103}
- {"key":"testkey39","value":"testvalue","time":1548039103}
- {"key":"testkey41","value":"testvalue","time":1548039103}
- {"key":"testkey43","value":"testvalue","time":1548039103}
- {"key":"testkey45","value":"testvalue","time":1548039103}
- {"key":"testkey47","value":"testvalue","time":1548039103}
- {"key":"testkey49","value":"testvalue","time":1548039103}
兩個消費者競爭著完成了全部任務,由于我的beanstalkd啟動時開啟了binlog持久,所以beanstalkd重啟后任務也不會丟失
3.需要注意的事項
1.創建job時,設置的超時時間Pheanstalk::DEFAULT_TTR一定要比消費者處理一個job的時間要長,否則job在超時之后會被tube更改為ready狀態,被其他消費者獲取,而此時當前消費者還在處理該job,這就出現了一個job被多個消費者重復執行的可怕現象
2.Pheanstalk的維護者發生了變化,在新版的Pheanstalk中是不支持長連接的,當客戶端socket連接服務器時間超過php.ini中設置的default_socket_timeout時,如果未能從服務端tube獲得job,連接將會被斷開,所以消費者進程需要維護,以便在退出后可以重新開啟進程,推薦使用supervisord維護消費者進程。
判斷socket超時的代碼
- public function getLine($length = null)
- {
- $timeout = ini_get('default_socket_timeout');
- $timer = microtime(true);
- do {
- $data = isset($length) ?
- $this->_wrapper()->fgets($this->_socket, $length) :
- $this->_wrapper()->fgets($this->_socket);
- if ($this->_wrapper()->feof($this->_socket)) {
- throw new Exception/SocketException('Socket closed by server!');
- }
- if (($data === false) && microtime(true) - $timer > $timeout) {
- $this->disconnect();
- throw new Exception/SocketException('Socket timed out!');
- }
- } while ($data === false);
- return rtrim($data);
- }
新聞熱點
疑難解答