欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka中利用自定义序列化器收发消息

程序员文章站 2022-07-14 21:47:40
...

Kafka中利用自定义序列化器收发消息

序列化器

  • Kafka中,自带的序列化器有 int、short、long、float、double、String 、byte[]等数据类型,但这种序列化器很难满足复杂数据的收发需求,如发送一个商户的数据,数据中包含user_id,age_range,gender,merchant_id,label等信息,数据如下所示,此时就需要自定义序列化器

    user_id,age_range,gender,merchant_id,label
    163968,0,0,4378,-1
    163968,0,0,2300,-1
    163968,0,0,1551,-1
    163968,0,0,4343,-1
    163968,0,0,4911,-1
    163968,0,0,4043,-1
    163968,0,0,2138,-1
    

准备工作

  • 将对象序列化字节流的包我们选择fastjson,在pox.xml中添加fastjson和kafka依赖

    		<dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.12</artifactId>
                <version>0.11.0.3</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.62</version>
            </dependency>
    
  • 创建Person类

    package streaming.kafka.bean;
    
    /**
     * \* project: SparkStudy
     * \* package: streaming.kafka.bean
     * \* author: Willi Wei
     * \* date: 2019-12-23 15:11:49
     * \* description:
     * \
     */
    public class Person {
        private String userId;
        private String ageRange;
        private String gender;
        private String merchantId;
        private String label;
    
        public Person() {
        }
    
        public Person(String userId, String ageRange, String gender, String merchantId, String label) {
            this.userId = userId;
            this.ageRange = ageRange;
            this.gender = gender;
            this.merchantId = merchantId;
            this.label = label;
        }
    
        public String getUserId() {
            return userId;
        }
    
        public void setUserId(String userId) {
            this.userId = userId;
        }
    
        public String getAgeRange() {
            return ageRange;
        }
    
        public void setAgeRange(String ageRange) {
            this.ageRange = ageRange;
        }
    
        public String getGender() {
            return gender;
        }
    
        public void setGender(String gender) {
            this.gender = gender;
        }
    
        public String getMerchantId() {
            return merchantId;
        }
    
        public void setMerchantId(String merchantId) {
            this.merchantId = merchantId;
        }
    
        public String getLabel() {
            return label;
        }
    
        public void setLabel(String label) {
            this.label = label;
        }
    
        @Override
        public String toString() {
            return "Person{" +
                    "userId='" + userId + '\'' +
                    ", ageRange='" + ageRange + '\'' +
                    ", gender='" + gender + '\'' +
                    ", merchantId='" + merchantId + '\'' +
                    ", label='" + label + '\'' +
                    '}';
        }
    }
    
  • 创建序列化器和反序列化器

    package streaming.kafka.bean;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.util.Map;
    
    /**
     * \* project: SparkStudy
     * \* package: streaming.kafka.bean
     * \* author: Willi Wei
     * \* date: 2019-12-23 15:22:43
     * \* description:
     * \
     */
    public class PersonSerializer implements Serializer<Person> {
        public void configure(Map map, boolean b) {
    
        }
    
        public byte[] serialize(String topic, Person person) {
            return JSON.toJSONBytes(person);
        }
    
        public void close() {
    
        }
    }
    
    package streaming.kafka.bean;
    import com.alibaba.fastjson.JSON;
    import org.apache.kafka.common.serialization.Deserializer;
    
    
    import java.util.Map;
    
    /**
     * \* project: SparkStudy
     * \* package: streaming.kafka.bean
     * \* author: Willi Wei
     * \* date: 2019-12-23 15:13:53
     * \* description:
     * \
     */
    public class PersonDeserializer implements Deserializer<Person> {
        public void configure(Map<String, ?> map, boolean b) {
    
        }
    
        public Person deserialize(String topic, byte[] data) {
            return JSON.parseObject(data, Person.class);
        }
    
        public void close() {
    
        }
    }
    

创建生产者和消费者

  • 创建生产者,生产者读取本地csv文件

    package streaming.kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import streaming.kafka.bean.Person;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * \* project: SparkStudy
     * \* package: streaming.kafka
     * \* author: Willi Wei
     * \* date: 2019-12-22 20:15:26
     * \* description:测试发送序列化的person数据
     * \
     */
    public class JavaKafkaProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "node02:9092,node03:9092,node04:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 0);
            props.put("buffer.memory", 33554432);
            // key的序列化方式
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // value的序列化方式
            props.put("value.serializer", "streaming.kafka.bean.PersonSerializer");
            KafkaProducer<String, Person> producer = new KafkaProducer<String, Person>(props);
            try {
                BufferedReader reader = new BufferedReader(new FileReader("data/test2000.csv"));
                reader.readLine();
                String line = null;
                while((line=reader.readLine())!=null){
                    //CSV格式文件为逗号分隔符文件,这里根据逗号切分
                    String items[] = line.split(",");
                    Person person = new Person();
                    person.setUserId(items[0]);
                    person.setAgeRange(items[1]);
                    person.setGender(items[2]);
                    person.setMerchantId(items[3]);
                    if (items.length == 4)
                    {
                        person.setLabel("null");
                    }else {
                        person.setLabel(items[4]);
                    }
                    producer.send(new ProducerRecord<String, Person>("topic1223",person.getUserId(), person));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.close();
        }
    }
    
  • 创建消费者

    package streaming.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.spark.streaming.Durations;
    import streaming.kafka.bean.Person;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.Properties;
    
    /**
     * \* project: SparkStudy
     * \* package: streaming.kafka
     * \* author: Willi Wei
     * \* date: 2019-12-23 15:51:37
     * \* description:
     * \
     */
    public class JavaKafkaConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "node02:9092,node03:9092,node04:9092");
            props.put("group.id", "group01");
            props.put("enable.auto.commit", "false");
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", "1000");
            // key的序列化方式
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // value的序列化方式
            props.put("value.deserializer", "streaming.kafka.bean.PersonDeserializer");
            KafkaConsumer<String, Person> consumer = new KafkaConsumer<String, Person>(props);
            consumer.subscribe(Arrays.asList("topic1223"));
            try {
                while(true){
                    ConsumerRecords<String, Person> records = consumer.poll(5000);
                    System.out.println("接收到的信息总数为:" + records.count());
                    for (ConsumerRecord<String, Person> record : records) {
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value().toString());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    

    输出为

    offset = 14107, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='4898', label='-1'}
    offset = 14108, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='1405', label='-1'}
    offset = 14109, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='4760', label='-1'}
    offset = 14110, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='1953', label='-1'}
    offset = 14111, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='1036', label='-1'}
    offset = 14112, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='567', label='-1'}
    offset = 14113, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='4676', label='-1'}
    offset = 14114, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='1862', label='-1'}
    offset = 14115, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='2674', label='-1'}
    offset = 14116, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='4920', label='-1'}
    offset = 14117, key = 372096, value = Person{userId='372096', ageRange='0', gender='0', merchantId='517', label='-1'}
    
相关标签: BigData