欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

osx - 请教大神们:用php客户端怎么连kafka,怎么做到监控broker变化而来刷新客户端数据的?

程序员文章站 2022-05-27 14:00:41
...
最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:
 Watches broker state, if broker changes, the client will refresh     broker and topic metadata stored in the client?

请使用过该工具的赐教思路,不甚感激!

补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao

现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:

// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);

read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);

    if ($readable > 0) {
        $remainingBytes = $len;
        $data = $chunk = '';
        while ($remainingBytes > 0) {
            $chunk = fread($this->stream, $remainingBytes);
            if ($chunk === false) {
                $this->close();
                throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
            }
            if (strlen($chunk) === 0) {
                // Zero bytes because of EOF?
                if (feof($this->stream)) {
                    $this->close();
                    throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
                }
                // Otherwise wait for bytes
                $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
                if ($readable !== 1) {
                    throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
                }
                continue; // attempt another read
            }
            $data .= $chunk;
            $remainingBytes -= strlen($chunk);
        }

通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码

呼高手!

回复内容:

最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:

 Watches broker state, if broker changes, the client will refresh     broker and topic metadata stored in the client?

请使用过该工具的赐教思路,不甚感激!

补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao

现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:

// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);

read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);

    if ($readable > 0) {
        $remainingBytes = $len;
        $data = $chunk = '';
        while ($remainingBytes > 0) {
            $chunk = fread($this->stream, $remainingBytes);
            if ($chunk === false) {
                $this->close();
                throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
            }
            if (strlen($chunk) === 0) {
                // Zero bytes because of EOF?
                if (feof($this->stream)) {
                    $this->close();
                    throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
                }
                // Otherwise wait for bytes
                $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
                if ($readable !== 1) {
                    throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
                }
                continue; // attempt another read
            }
            $data .= $chunk;
            $remainingBytes -= strlen($chunk);
        }

通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码

呼高手!

consumer应该只能通过循环来判断broker是否有变化,然后更新说。

相关标签: php osx kafka-php