欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

php测试kafka项目示例

程序员文章站 2022-06-03 12:49:19
本文实例讲述了php测试kafka项目。分享给大家供大家参考,具体如下:概述kafka是最初由linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志...

本文实例讲述了php测试kafka项目。分享给大家供大家参考,具体如下:

概述

kafka是最初由linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做mq系统),常见可以用于web/nginx日志、访问日志,消息服务等等,linkedin于2010年贡献给了apache基金会并成为*开源项目。

主要应用场景是:日志收集系统和消息系统。

安装kafka-php项目依赖

composer require nmred/kafka-php

produce.php

<?php
require './vendor/autoload.php';
date_default_timezone_set('prc');
$config = \kafka\producerconfig::getinstance();
$config->setmetadatarefreshintervalms(10000);
$config->setmetadatabrokerlist('127.0.0.1:9092');
$config->setbrokerversion('0.10.2.1');
$config->setrequiredack(1);
$config->setisasyn(false);
$config->setproduceinterval(500);
$producer = new \kafka\producer(function() {
 $t = time();
 return array(
 array(
  'topic' => 'test',
  'value' => $t,
  'key' => $t,
 ),
 );
});
$producer->success(function($result) {
 var_export($result);
});
$producer->error(function($errorcode) {
 var_dump('error', $errorcode);
});
$producer->send();

consumer.php

<?php
require './vendor/autoload.php';
date_default_timezone_set('prc');
$config = \kafka\consumerconfig::getinstance();
$config->setmetadatarefreshintervalms(10000);
$config->setmetadatabrokerlist('127.0.0.1:9092');
$config->setgroupid('test');
$config->setbrokerversion('0.10.2.1');
$config->settopics(array('test'));
$consumer = new \kafka\consumer();
$consumer->start(function($topic, $part, $message) {
 var_dump($message);
});

测试生产者

php produce.php

测试消费者

php consumer.php