欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

php接收rabbitMQ消息并执行繁重任务

程序员文章站 2022-06-09 13:54:26
...
1) 建立消息队列基础类
 '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest');    //构造函数,依次创建通道,交换机,队列    public function __construct()    {        try{            $this->_connectHandler = new AMQPConnection($this->_config);            if(!$this->_connectHandler->connect()) {                die('connect failed');            }            $this->createChannel();            $this->createExchange();            $this->createQueue();        } catch(Exception $e) {            echo $e->getMessage();        }    }        //创建通道    protected function createChannel()    {        $this->_channelObject = new AMQPChannel($this->_connectHandler);    }        //创建交换机    public function createExchange($exchangeName='', $exchangeType=AMQP_EX_TYPE_DIRECT)    {        $exName = $exchangeName?$exchangeName:$this->_exchangeName;        $this->_exchangeObject = new AMQPExchange($this->_channelObject);        $this->_exchangeObject->setName($exName);        $this->_exchangeObject->setType($exchangeType);        $this->_exchangeObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);        $this->_exchangeObject->declareExchange();    }        //创建队列    public function createQueue()    {        $this->_queueObject = new AMQPQueue($this->_channelObject);        $this->_queueObject->setName($this->_queueName);        $this->_queueObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);        $this->_queueObject->declareQueue();        $this->_queueObject->bind($this->_exchangeObject->getName(), $this->_routeKey);    }  }


2)有一个任务,会连续向队列中推送消息,累计起来,队列中会有大量的消息.....

3)客户端连续的接受队列中的消息,并执行相应的任务

_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) {                $requestUrl = $e->getBody();                if ($requestUrl) {                   // var_dump($requestUrl);                    $execHandler = new ExecProcess();                    $execHandler->start($requestUrl);                    $execHandler->execSave();                    unset($execHandler);                    $q->nack($e->getDeliveryTag());                } else {                    usleep(100);                }            });        }    }}$reciver = new Recv();$reciver->recvMessage();


已知 require_once 'ExecProcess.class.php'; 这个类是没有问题的,单独执行可以通过,但是加到消息队列的客户端,接收消息,并执行一个繁重任务时,注:(php-cli模式下)执行时,客户端直接退出,无报错。

如果像下面这样时,则是可以正常运行,并打印队列中的消息的

_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) {                $requestUrl = $e->getBody();                if ($requestUrl) {                    var_dump($requestUrl);//                    $execHandler = new ExecProcess();//                    $execHandler->start($requestUrl);//                    $execHandler->execSave();//                    unset($execHandler);                    $q->nack($e->getDeliveryTag());                } else {                    usleep(100);                }            });        }    }}$reciver = new Recv();$reciver->recvMessage();


也就是把ExecProcess.class.php'加进来时,接收消息的客户端,会自动退出,而且不会报错。。。

相反,去掉require_once 'ExecProcess.class.php';并把处理消息的逻辑去掉,是可以把队列中的消息打印出来的.....不知道是什么鬼

因为买的书,还没来得及看。

问题因该是很明显的,谁能给我一个思路,或者提示? 3Q


回复讨论(解决方案)

我觉得我很忧伤......这是要沉贴,翻船的节奏吗?

ExecProcess是不是出问题了什么

去掉require_once 'ExecProcess.class.php';并把处理消息的逻辑去掉,是可以把队列中的消息打印出来的.....

看你描述,应该是ExecProcess.class.php中,处理消息的部分出问题了,重点检查这部分代码,看看是什么异常。

执行繁重任务才出错,
可以检查是否执行超时导致。