osx - 请教大神们:用php客户端怎么连kafka,怎么做到监控broker变化而来刷新客户端数据的?
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:
Watches broker state, if broker changes, the client will refresh broker and topic metadata stored in the client?
请使用过该工具的赐教思路,不甚感激!
补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao
现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:
// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);
read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
if ($readable > 0) {
$remainingBytes = $len;
$data = $chunk = '';
while ($remainingBytes > 0) {
$chunk = fread($this->stream, $remainingBytes);
if ($chunk === false) {
$this->close();
throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
}
if (strlen($chunk) === 0) {
// Zero bytes because of EOF?
if (feof($this->stream)) {
$this->close();
throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
}
// Otherwise wait for bytes
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
if ($readable !== 1) {
throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
}
continue; // attempt another read
}
$data .= $chunk;
$remainingBytes -= strlen($chunk);
}
通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码
呼高手!
回复内容:
最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:
Watches broker state, if broker changes, the client will refresh broker and topic metadata stored in the client?
请使用过该工具的赐教思路,不甚感激!
补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao
现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:
// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);
read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
if ($readable > 0) {
$remainingBytes = $len;
$data = $chunk = '';
while ($remainingBytes > 0) {
$chunk = fread($this->stream, $remainingBytes);
if ($chunk === false) {
$this->close();
throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
}
if (strlen($chunk) === 0) {
// Zero bytes because of EOF?
if (feof($this->stream)) {
$this->close();
throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
}
// Otherwise wait for bytes
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
if ($readable !== 1) {
throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
}
continue; // attempt another read
}
$data .= $chunk;
$remainingBytes -= strlen($chunk);
}
通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码
呼高手!
consumer应该只能通过循环来判断broker是否有变化,然后更新说。