欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka Socket通信+遇到的奇怪问题

程序员文章站 2022-03-04 13:48:57
...

1.使用Kafka2.0 API Socket创建Topic:

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class TopicTest {
    public static void main(String[] args){
        try {
            TopicTest topicTest=new TopicTest();
            topicTest.createTopics(10, (short) 1);
        }catch (IOException e){

        }
    }

    //Kafka对每一种操作都定义了一对Request和Response类,比如这里用到的CreateTopicsRequest和CreateTopicsResponse
    public void createTopics(int partitions,short replicationFactor) throws IOException {
        Map<String, CreateTopicsRequest.TopicDetails> topics=new HashMap<>();
        topics.put("newtopic",new CreateTopicsRequest.TopicDetails(partitions,replicationFactor));
        CreateTopicsRequest request=new CreateTopicsRequest.Builder(topics,60000).build();
        ByteBuffer response=send("localhost",9092,request, ApiKeys.CREATE_TOPICS);
        CreateTopicsResponse.parse(response,request.version());
    }

    private ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKeys) throws IOException {
        Socket socket=new Socket(host,port);
        try {
            return send(request,apiKeys,socket);
        }finally {
            socket.close();
        }
    }

    private ByteBuffer send(AbstractRequest request, ApiKeys apiKeys, Socket socket) throws IOException {
        RequestHeader header=new RequestHeader(apiKeys,request.version(),"client",0);
        byte[] response=issueRequestAndWaitForResponse(socket,header,request);
        ByteBuffer responseBuffer=ByteBuffer.wrap(response);
        ResponseHeader.parse(responseBuffer);
        return responseBuffer;
    }

    private byte[] issueRequestAndWaitForResponse(Socket socket,RequestHeader header,AbstractRequest request) throws IOException {
        DataOutputStream dos=new DataOutputStream(socket.getOutputStream());
        byte[] serializedRequest=request.serialize(header).array();
        dos.writeInt(serializedRequest.length);
        dos.write(serializedRequest);
        dos.flush();
        dos.close();
        DataInputStream dis=new DataInputStream(socket.getInputStream());
        byte[] response=new byte[dis.readInt()];
        dis.readFully(response);
        return response;
    }
}


《Apache Kafka实战》上构建请求的方法是:

ByteBuffer buffer=ByteBuffer.allocate(header.sizeOf()+request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte[] serializedRequest=buffer.array();


实测2.0API没有sizeOfwriteTo方法
使用 request.serialize(header).array() 代替

运行后使用脚本查询topic:

[email protected]:/usr/kafka# bin/kafka-topics.sh --list --zookeeper localhost:2181
newtopic

2.遇到的Kafka+Zookeeper奇怪问题:
启动Zookeeper成功,但是无法启动Kafka,提示超时:

[2018-08-24 07:59:48,997] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-08-24 07:59:55,000] WARN Client session timed out, have not heard from server in 6003ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
[2018-08-24 07:59:55,009] INFO Client session timed out, have not heard from server in 6003ms for sessionid 0x0, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)

已设置 zookeeper.connection.timeout.ms=10000 ,继续调大到60000,仍然报错
百度得到的答案是数据量过大,但是本机没有多少数据
由于都是在本地运行,应该不存在网络问题,于是想到可能是端口无法连接,但是WSL下无法使用netstat查看端口占用(输出为空,不知道是个例还是都这样)。
于是在cmd中使用:netstat -ano,发现了两个占用2181端口的进程(zookeeper实际只开了一个),资源管理器查看PID发现两个进程都是java
进入WSL,使用pkill java杀掉所有残留进程,重新启动zookeeper和kafka,顺利启动

相关标签: kafka

上一篇: 合并单元格

下一篇: poi合并单元格