欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka2.0-序列化与反序列化_07

程序员文章站 2022-03-26 21:41:48
...

概要:先讲讲kafka中简单的序列化方式,以及实现,然后再使用protobuf实现一个自定义序列化的小程序

正如之前文章中写过的例子程序,我们配置的序列化方式都是下面这样的。

//生产者的序列化配置
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//消费者的反序列化配置
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

消息在生产端序列化之后,发送到kafka集群,然后消费端得到消息之后反序列化,取出数据。这就是消息流转的过程。我们之前一直配置的序列化方式都是StringSerializerStringDeserializer
我们来看看StringSerializer的源码,如下:

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

通过上面的源码其实你很容易发现,其本质其实是使用java中的String对象的getBytes方法,只不过它这里稍微封装了一下而已。
除此之外,kafka还提供了其他的一些简单的序列化方式,基本都是针对简单类型的,如下:
kafka2.0-序列化与反序列化_07

但是在我们实际的应用中,我们的消息基本上都是复杂的自定义类型,所以这些并不适用。那么如何序列化复杂类型呢?

第一种容易想到的方案就是:json,将复杂类型的对象转化为json字符串,然后还是通过StringSerializerStringDeserializer进行序列化和反序列化,这种方式在实际应用的也经常运用到。
第二种方案就是,使用jdk自带的序列化,这种方案可用,但是效率不高,因为jdk自带的序列化有两个毛病,一方面是序列化速度慢,其次是序列化之后,字节数组太大,基本是对象的两倍。所以在不考虑高效的情况下,可以使用。
第三种方案就是,使用当前业界一些流行的,高效的序列化方案,比如protobuf,Thrift,Avro等等。以protobuf为例,其序列化速度和字节码大小都很不错,但是需要你去生成对应的protobuf类,用起来可能没有json那么方便。

以下针对这三种方案,分别写一个例子程序。

1. JSON

/**生产者**/
public class JSONSerializerProducer {

    public static final String TOPIC_NAME = "producer-0"; 
    private  static Properties props = new Properties();

    static{
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    }

    public static void main(String[] args) {
         Producer<String, String> producer = new KafkaProducer<>(props);

         User user = new User(101L,"kafka","aaa@qq.com",1);

         producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Long.toString(user.getId()), JSON.toJSONString(user)));
         producer.close();
    }
}
/**消费者*/
public class JSONDeserializerConsumer {
    private  static Properties props = new Properties();

    private static boolean isClose = false;

    static{
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public  static void main(String args[]){
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList(JSONSerializerProducer.TOPIC_NAME));
         while (!isClose) {
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("key = %s, value = %s%n", record.key(), JSON.parseObject(record.value(), User.class));
         }

         consumer.close();
    }
}
/** 传输的MQ VO **/
public class User implements Serializable{
    private static final long serialVersionUID = 468062760765055608L;

    private Long id;

    private String name;

    private String email;
    /** {0:男,1:女} **/
    private Integer sex;

    public User() {}

    public User(Long id, String name, String email, Integer sex) {
        super();
        this.id = id;
        this.name = name;
        this.email = email;
        this.sex = sex;
    }

    @Override
    public String toString() {
        return "[ID:" + id + ", 姓名:" + name + ", 性别:" + (sex==0?"男":"女") + ", 邮箱:" + email + "]";
    }

    /********************** getter & setter******************************/
    public Long getId() { return id; }
    public void setId(Long id) { this.id = id; }

    public String getName() { return name; }
    public void setName(String name) { this.name = name; }

    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }

    public Integer getSex() { return sex; }
    public void setSex(Integer sex) { this.sex = sex; }
    /********************** getter & setter******************************/
}

示例源码:https://github.com/Mryangtaofang/sample

消费者接收到的消息:
kafka2.0-序列化与反序列化_07

2. JDK序列化

/**生产者-使用jdk序列化*/
public class JDKSerializerProducer {

    public static final String TOPIC_NAME = "producer-0"; 
    private  static Properties props = new Properties();

    static{
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "com.yang.kafka.serialization.JDKSerializer");
    }

    public static void main(String[] args) {
         Producer<String, User> producer = new KafkaProducer<>(props);

         User user = new User(101L,"kafka","aaa@qq.com",1);
         producer.send(new ProducerRecord<String, User>(TOPIC_NAME, Long.toString(user.getId()), user));

         producer.close();
    }
}

自定义的序列化器:

/**
 * JDK序列化方式实现kafka消息的的序列化
 */
public class JDKSerializer implements Serializer<Serializable>{

    private ByteArrayOutputStream byteArrStream;
    private ObjectOutputStream objectStream;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        byteArrStream = new ByteArrayOutputStream();
    }

    @Override
    public byte[] serialize(String topic, Serializable data) {
        if (data == null)
            return null;

        byte[] bytes = null;
        try {
            objectStream = new ObjectOutputStream(byteArrStream);
            objectStream.writeObject(data);

            bytes = byteArrStream.toByteArray();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return bytes;
    }


    @Override
    public void close() {
        try {
            if(byteArrStream != null) byteArrStream.close();

            if(objectStream != null) objectStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
/**
 * 消费者-使用jdk反序列化
 */
public class JDKDeserializerConsumer {
    private  static Properties props = new Properties();

    private static boolean isClose = false;

    static{
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "com.yang.kafka.serialization.JDKDeserializer");
    }

    public  static void main(String args[]){
         KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList(JSONSerializerProducer.TOPIC_NAME));
         while (!isClose) {
             ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
             for (ConsumerRecord<String, User> record : records)
                 System.out.printf("key = %s, value = %s%n", record.key(), record.value());
         }

         consumer.close();
    }
}

自定义的反序列化器:

/**
 * JDK反序列化方式实现kafka消息的的反序列化
 */
public class JDKDeserializer implements Deserializer<Serializable>{

    private ByteArrayInputStream byteArrStream;
    private ObjectInputStream objectStream;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {}

    @Override
    public Serializable deserialize(String topic, byte[] data) {
        try {
            byteArrStream = new ByteArrayInputStream(data);
            objectStream = new ObjectInputStream(byteArrStream);

            return (Serializable)objectStream.readObject();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void close() {
        try {
            if(byteArrStream != null) byteArrStream.close();

            if(objectStream != null) objectStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

运行结果与上面的json方式的结果相同。

为了避免文章篇幅过大,采用protobuf实现序列化的方式在下一篇文章讲。

相关标签: kafka