php测试kafka项目示例
程序员文章站
2023-09-08 09:43:48
本文实例讲述了php测试kafka项目。分享给大家供大家参考,具体如下:概述kafka是最初由linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志...
本文实例讲述了php测试kafka项目。分享给大家供大家参考,具体如下:
概述
kafka是最初由linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做mq系统),常见可以用于web/nginx日志、访问日志,消息服务等等,linkedin于2010年贡献给了apache基金会并成为*开源项目。
主要应用场景是:日志收集系统和消息系统。
安装kafka-php项目依赖
composer require nmred/kafka-php
produce.php
<?php require './vendor/autoload.php'; date_default_timezone_set('prc'); $config = \kafka\producerconfig::getinstance(); $config->setmetadatarefreshintervalms(10000); $config->setmetadatabrokerlist('127.0.0.1:9092'); $config->setbrokerversion('0.10.2.1'); $config->setrequiredack(1); $config->setisasyn(false); $config->setproduceinterval(500); $producer = new \kafka\producer(function() { $t = time(); return array( array( 'topic' => 'test', 'value' => $t, 'key' => $t, ), ); }); $producer->success(function($result) { var_export($result); }); $producer->error(function($errorcode) { var_dump('error', $errorcode); }); $producer->send();
consumer.php
<?php require './vendor/autoload.php'; date_default_timezone_set('prc'); $config = \kafka\consumerconfig::getinstance(); $config->setmetadatarefreshintervalms(10000); $config->setmetadatabrokerlist('127.0.0.1:9092'); $config->setgroupid('test'); $config->setbrokerversion('0.10.2.1'); $config->settopics(array('test')); $consumer = new \kafka\consumer(); $consumer->start(function($topic, $part, $message) { var_dump($message); });
测试生产者
php produce.php
测试消费者
php consumer.php