Kafka Socket通信+遇到的奇怪问题
程序员文章站
2022-03-04 13:48:57
...
1.使用Kafka2.0 API Socket创建Topic:
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class TopicTest {
public static void main(String[] args){
try {
TopicTest topicTest=new TopicTest();
topicTest.createTopics(10, (short) 1);
}catch (IOException e){
}
}
//Kafka对每一种操作都定义了一对Request和Response类,比如这里用到的CreateTopicsRequest和CreateTopicsResponse
public void createTopics(int partitions,short replicationFactor) throws IOException {
Map<String, CreateTopicsRequest.TopicDetails> topics=new HashMap<>();
topics.put("newtopic",new CreateTopicsRequest.TopicDetails(partitions,replicationFactor));
CreateTopicsRequest request=new CreateTopicsRequest.Builder(topics,60000).build();
ByteBuffer response=send("localhost",9092,request, ApiKeys.CREATE_TOPICS);
CreateTopicsResponse.parse(response,request.version());
}
private ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKeys) throws IOException {
Socket socket=new Socket(host,port);
try {
return send(request,apiKeys,socket);
}finally {
socket.close();
}
}
private ByteBuffer send(AbstractRequest request, ApiKeys apiKeys, Socket socket) throws IOException {
RequestHeader header=new RequestHeader(apiKeys,request.version(),"client",0);
byte[] response=issueRequestAndWaitForResponse(socket,header,request);
ByteBuffer responseBuffer=ByteBuffer.wrap(response);
ResponseHeader.parse(responseBuffer);
return responseBuffer;
}
private byte[] issueRequestAndWaitForResponse(Socket socket,RequestHeader header,AbstractRequest request) throws IOException {
DataOutputStream dos=new DataOutputStream(socket.getOutputStream());
byte[] serializedRequest=request.serialize(header).array();
dos.writeInt(serializedRequest.length);
dos.write(serializedRequest);
dos.flush();
dos.close();
DataInputStream dis=new DataInputStream(socket.getInputStream());
byte[] response=new byte[dis.readInt()];
dis.readFully(response);
return response;
}
}
《Apache Kafka实战》上构建请求的方法是:
ByteBuffer buffer=ByteBuffer.allocate(header.sizeOf()+request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte[] serializedRequest=buffer.array();
实测2.0API没有sizeOf和writeTo方法
使用 request.serialize(header).array() 代替
运行后使用脚本查询topic:
[email protected]:/usr/kafka# bin/kafka-topics.sh --list --zookeeper localhost:2181
newtopic
2.遇到的Kafka+Zookeeper奇怪问题:
启动Zookeeper成功,但是无法启动Kafka,提示超时:
[2018-08-24 07:59:48,997] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-08-24 07:59:55,000] WARN Client session timed out, have not heard from server in 6003ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
[2018-08-24 07:59:55,009] INFO Client session timed out, have not heard from server in 6003ms for sessionid 0x0, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
已设置 zookeeper.connection.timeout.ms=10000 ,继续调大到60000,仍然报错
百度得到的答案是数据量过大,但是本机没有多少数据
由于都是在本地运行,应该不存在网络问题,于是想到可能是端口无法连接,但是WSL下无法使用netstat查看端口占用(输出为空,不知道是个例还是都这样)。
于是在cmd中使用:netstat -ano,发现了两个占用2181端口的进程(zookeeper实际只开了一个),资源管理器查看PID发现两个进程都是java
进入WSL,使用pkill java杀掉所有残留进程,重新启动zookeeper和kafka,顺利启动