Tp5使用Kafka:封装生产者、消费者操作类
程序员文章站
2024-01-14 11:32:22
...
【相关文章】PHP操作Kafka:php-rdkafka扩展的安装
1、config.php中配置:
//kafka连接配置
'kafka_server' => [
'host' => '127.0.0.1:9092',
'topic' => 'topic1',
],
2、创建一个生产者 KafkaProducer.php :
<?php
namespace app\index\controller;
class KafkaProducer
{
private $_rk = null;
private $_topic = null;
private static $_instance = null;
private $_config = [];
//定义单例模式的变量
public static function getInstance()
{
if (empty(self::$_instance)) {
self::$_instance = new self();
}
return self::$_instance;
}
private function __construct()
{
$this->_config = config('kafka_server');
$this->_rk = new \RdKafka\Producer();
$this->_rk->setLogLevel(LOG_DEBUG);
$this->_rk->addBrokers($this->_config['host']);
$this->_topic = $this->_rk->newTopic($this->_config['topic']);
}
public function add($data){
$this->_topic->produce(RD_KAFKA_PARTITION_UA, 0, $data);
$this->_rk->poll(0);
while ($this->_rk->getOutQLen() > 0) {
$this->_rk->poll(50);
}
}
}
3、创建一个消费者 KafkaConsumer.php :
<?php
namespace app\index\controller;
class KafkaConsumer
{
private $_topic = null;
private $_partition = 0;
private $_config = [];
public function __construct($groupId=0, $partition=0)
{
$this->_partition = $partition;
$this->_config = config('kafka_server');
$conf = new \RdKafka\Conf();
$conf->set('group.id', $groupId);
$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers($this->_config['host']);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
$this->_topic = $rk->newTopic($this->_config['topic'], $topicConf);
$this->_topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);
}
public function run(){
while (true) {
$message = $this->_topic->consume($this->_partition, 120*10000);
Logger('KafkaConsumer::run::1', [$message]);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$this->exceTask($message->payload);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF: //等待接收信息
error_log("No more messages; will wait for more\n");
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT: //超时
error_log("Timed out\n");
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
}
private function exceTask($jsonData){
error_log($jsonData);
$paramsArr = json_decode($jsonData,true);
if(!isset($paramsArr['className']) || !isset($paramsArr['funcName'])){
error_log("Param error\n");
}
$objUrl = '\\app\\index\\service\\' . $paramsArr['className'];
try {
$obj = new $objUrl();
return call_user_func([$obj, $paramsArr['funcName']], $jsonData);
} catch (\Exception $e) {
error_log("Func not found\n");
}
}
}
4、测试:
- 调用生产者生产 Test.php:
<?php
namespace app\index\controller;
class Test extends Controller
{
public function producer(){
$data = [
'className' => 'TestService',
'funcName' => 'index',
'user_id' => 1,
];
KafkaProducer::getInstance()->add(json_encode($data));
}
}
- 消费者消费回调 TestService.php :
<?php
namespace app\index\service;
class TestService
{
public function index(){
$paramArr = [];
$args = func_get_args();
if(isset($args[0]) && is_string($args[0])){
$paramArr = json_decode($args[0],true);
}
for ($i=0;$i < 100;$i++){
sleep(1);
Logger('TestService::test::1', [$paramArr]);
}
}
}
- 启动消费者 KafkaConsumer -> run() 后,调用生产者生产 Test.php,此时在消费者消费回调 TestService.php 中可看到如下日志记录,则消费者消费成功:
下一篇: #QT练习(一)