kafka2.0-序列化与反序列化_07
概要:先讲讲kafka中简单的序列化方式,以及实现,然后再使用protobuf实现一个自定义序列化的小程序
正如之前文章中写过的例子程序,我们配置的序列化方式都是下面这样的。
//生产者的序列化配置
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//消费者的反序列化配置
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
消息在生产端序列化之后,发送到kafka集群,然后消费端得到消息之后反序列化,取出数据。这就是消息流转的过程。我们之前一直配置的序列化方式都是StringSerializer
和StringDeserializer
。
我们来看看StringSerializer的源码,如下:
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
通过上面的源码其实你很容易发现,其本质其实是使用java
中的String
对象的getBytes
方法,只不过它这里稍微封装了一下而已。
除此之外,kafka还提供了其他的一些简单的序列化方式,基本都是针对简单类型的,如下:
但是在我们实际的应用中,我们的消息基本上都是复杂的自定义类型,所以这些并不适用。那么如何序列化复杂类型呢?
第一种容易想到的方案就是:
json
,将复杂类型的对象转化为json
字符串,然后还是通过StringSerializer
和StringDeserializer
进行序列化和反序列化,这种方式在实际应用的也经常运用到。
第二种方案就是,使用jdk自带的序列化,这种方案可用,但是效率不高,因为jdk自带的序列化有两个毛病,一方面是序列化速度慢,其次是序列化之后,字节数组太大,基本是对象的两倍。所以在不考虑高效的情况下,可以使用。
第三种方案就是,使用当前业界一些流行的,高效的序列化方案,比如protobuf,Thrift,Avro等等。以protobuf为例,其序列化速度和字节码大小都很不错,但是需要你去生成对应的protobuf类,用起来可能没有json那么方便。
以下针对这三种方案,分别写一个例子程序。
1. JSON
/**生产者**/
public class JSONSerializerProducer {
public static final String TOPIC_NAME = "producer-0";
private static Properties props = new Properties();
static{
props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public static void main(String[] args) {
Producer<String, String> producer = new KafkaProducer<>(props);
User user = new User(101L,"kafka","aaa@qq.com",1);
producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Long.toString(user.getId()), JSON.toJSONString(user)));
producer.close();
}
}
/**消费者*/
public class JSONDeserializerConsumer {
private static Properties props = new Properties();
private static boolean isClose = false;
static{
props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String args[]){
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(JSONSerializerProducer.TOPIC_NAME));
while (!isClose) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("key = %s, value = %s%n", record.key(), JSON.parseObject(record.value(), User.class));
}
consumer.close();
}
}
/** 传输的MQ VO **/
public class User implements Serializable{
private static final long serialVersionUID = 468062760765055608L;
private Long id;
private String name;
private String email;
/** {0:男,1:女} **/
private Integer sex;
public User() {}
public User(Long id, String name, String email, Integer sex) {
super();
this.id = id;
this.name = name;
this.email = email;
this.sex = sex;
}
@Override
public String toString() {
return "[ID:" + id + ", 姓名:" + name + ", 性别:" + (sex==0?"男":"女") + ", 邮箱:" + email + "]";
}
/********************** getter & setter******************************/
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public Integer getSex() { return sex; }
public void setSex(Integer sex) { this.sex = sex; }
/********************** getter & setter******************************/
}
消费者接收到的消息:
2. JDK序列化
/**生产者-使用jdk序列化*/
public class JDKSerializerProducer {
public static final String TOPIC_NAME = "producer-0";
private static Properties props = new Properties();
static{
props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.yang.kafka.serialization.JDKSerializer");
}
public static void main(String[] args) {
Producer<String, User> producer = new KafkaProducer<>(props);
User user = new User(101L,"kafka","aaa@qq.com",1);
producer.send(new ProducerRecord<String, User>(TOPIC_NAME, Long.toString(user.getId()), user));
producer.close();
}
}
自定义的序列化器:
/**
* JDK序列化方式实现kafka消息的的序列化
*/
public class JDKSerializer implements Serializer<Serializable>{
private ByteArrayOutputStream byteArrStream;
private ObjectOutputStream objectStream;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
byteArrStream = new ByteArrayOutputStream();
}
@Override
public byte[] serialize(String topic, Serializable data) {
if (data == null)
return null;
byte[] bytes = null;
try {
objectStream = new ObjectOutputStream(byteArrStream);
objectStream.writeObject(data);
bytes = byteArrStream.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return bytes;
}
@Override
public void close() {
try {
if(byteArrStream != null) byteArrStream.close();
if(objectStream != null) objectStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 消费者-使用jdk反序列化
*/
public class JDKDeserializerConsumer {
private static Properties props = new Properties();
private static boolean isClose = false;
static{
props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.yang.kafka.serialization.JDKDeserializer");
}
public static void main(String args[]){
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(JSONSerializerProducer.TOPIC_NAME));
while (!isClose) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records)
System.out.printf("key = %s, value = %s%n", record.key(), record.value());
}
consumer.close();
}
}
自定义的反序列化器:
/**
* JDK反序列化方式实现kafka消息的的反序列化
*/
public class JDKDeserializer implements Deserializer<Serializable>{
private ByteArrayInputStream byteArrStream;
private ObjectInputStream objectStream;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public Serializable deserialize(String topic, byte[] data) {
try {
byteArrStream = new ByteArrayInputStream(data);
objectStream = new ObjectInputStream(byteArrStream);
return (Serializable)objectStream.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
@Override
public void close() {
try {
if(byteArrStream != null) byteArrStream.close();
if(objectStream != null) objectStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行结果与上面的json方式的结果相同。
为了避免文章篇幅过大,采用protobuf实现序列化的方式在下一篇文章讲。
上一篇: Virtualbox中ubuntu配置静态ip地址及DNS
下一篇: Js逆向-滑动验证码图片还原