kafka装配及Kafka-PHP扩展的使用
话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。
实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。
一. 首先确认下jdk有没有安装
使用命令
[[email protected] ~]# java -versionjava version "1.8.0_73"Java(TM) SE Runtime Environment (build 1.8.0_73-b02)Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)
如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。
然后打开/etc/profile文件
[[email protected] ~]# vim /etc/profile
把下面这段代码写到文件里
export JAVA_HOME=/usr/local/jdk/jdk1.8.0_73export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jarexport PATH=$JAVA_HOME/bin:$PATH
最后
[[email protected] ~]# source /etc/profile
这时jdk就生效了,可以使用 java -version验证下。
二. 接下来安装Kafka
1. 下载Kafka
到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。
2. 下载完解压到你喜欢的目录
我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 运行默认的Kafka
启动Zookeeper server
[[email protected] kafka_2.9.1-0.8.2.2]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &
启动Kafka server
[[email protected] kafka_2.9.1-0.8.2.2]# sh bin/kafka-server-start.sh config/server.properties &
运行生产者producer
[[email protected] kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
运行消费者consumer
[[email protected] kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
这样,在producer那边输入内容,consumer马上就能接收到。
4. 当有跨机的producer或consumer连接时
需要配置config/server.properties的host.name,要不然跨机的连不上。
三. Kafka-PHP扩展
使用了一圈,就https://github.com/nmred/kafka-php可以用。
我是使用composer安装的,以下是示例:
producer.php
phprequire 'vendor/autoload.php';while (1) { $part = mt_rand(0, 1); $produce = \Kafka\Produce::getInstance('kafka0:2181', 3000); // get available partitions $partitions = $produce->getAvailablePartitions('topic_name'); var_dump($partitions); // send message $produce->setRequireAck(-1); $produce->setMessages('topic_name', 0, array(date('Y-m-d H:i:s')); sleep(3);}
consumer.php
require 'vendor/autoload.php';$consumer = \Kafka\Consumer::getInstance('kafka0:2181');$group = 'topic_name';$consumer->setGroup($group);$consumer->setFromOffset(true);$consumer->setTopic('topic_name', 0);$consumer->setMaxBytes(102400);$result = $consumer->fetch();print_r($result);foreach ($result as $topicName => $partition) { foreach ($partition as $partId => $messageSet) { var_dump($partition->getHighOffset()); foreach ($messageSet as $message) { var_dump((string)$message); } var_dump($partition->getMessageOffset()); }}
相关文章
相关视频
专题推荐
-
独孤九贱-php全栈开发教程
全栈 170W+
主讲:Peter-Zhu 轻松幽默、简短易学,非常适合PHP学习入门
-
玉女心经-web前端开发教程
入门 80W+
主讲:灭绝师太 由浅入深、明快简洁,非常适合前端学习入门
-
天龙八部-实战开发教程
实战 120W+
主讲:西门大官人 思路清晰、严谨规范,适合有一定web编程基础学习
- 最新文章
- 热门排行
上一篇: Spring mvc环境搭建
下一篇: 使用 PHP 编写的 Kafka 客户端
推荐阅读
-
babel的使用及安装配置教程
-
kafka的go版本api介绍及使用示例
-
CentOS 5.X xen虚拟机的安装配置及基本使用
-
SQLServer中使用扩展事件获取Session级别的等待信息及SQLServer 2016中Session级别等待信息的增强
-
USB3.0扩展卡速度很慢的原因和解决办法及使用注意
-
CentOS系统下docker的安装配置及使用介绍
-
python自带缓存lru_cache用法及扩展的使用
-
用LinQ扩展方法,泛型扩展方法,实现自定义验证字符是否空、对象是否为null,及泛型约束使用,Action的使用
-
Linux中的进程守护supervisor安装配置及使用
-
Gitee的下载安装配置及使用步骤详解
网友评论
文明上网理性发言,请遵守 新闻评论服务协议
我要评论