欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink读写Kafka

程序员文章站 2022-07-14 13:35:02
...

Kafka 创建 两个topic 一个用于发送信息 一个用于接收Flink处理之后的信息

Kafka生产者Java代码

package cn.oneseek;
import cn.oneseek.util.JsonData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class Producer {
    public static void main(String[] args) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "localhost:9092");
        //设置数据key和value的序列化处理类
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        //创建生产者实例
        KafkaProducer<String,String> producer = new KafkaProducer<>(props);
        Map<String,String> map = new HashMap<>();
        map.put("单体Json",JsonData.str1);
        map.put("嵌套单体Json",JsonData.str2);
        map.put("数组Json",JsonData.str3);
        map.put("嵌套Json数组",JsonData.str4);
        map.put("变体Json(子Json为单体Json或Json数组)",JsonData.str5);
        for (Map.Entry<String,String> entry:map.entrySet()){
            ProducerRecord record = new ProducerRecord<String, String>("test", entry.getValue());
            //发送记录
            producer.send(record);
        }
        producer.close();
    }
}

Kafka消费者java代码

package cn.oneseek;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
    public static void main(String[] args) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "localhost:9092");
        //必须指定消费者组
        props.put("group.id", "test-consumer-group");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
        //订阅topic1的消息
        consumer.subscribe(Arrays.asList("test1"));
        //到服务器中读取记录
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for(ConsumerRecord<String,String> record : records){
                System.out.println(record.value());
            }
        }
    }
}
相关标签: 大数据