欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka-stream流式处理示例

程序员文章站 2022-05-22 20:35:52
...

   一 首生是kafka -stream 版本号问题,然后是springboot1.5.6兼容问题,发现springboot2.0不支持kafka -stream1.0.2包

下面直接依赖包  坑了很久,各种找版本

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <!-- 1.5.6版本 能配置 kafka-stream 1.0.2  最新版本的流依赖版,简化很多步骤  不支持kafak的 springboot 2.0-->
         <version>1.5.6.RELEASE</version>  
        <relativePath/>
    </parent>
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
		       <version>1.1.0</version>
            <exclusions>
                <exclusion>
                    <groupId>com.101tec</groupId>
                    <artifactId>zkclient</artifactId>
                </exclusion>
		        <exclusion>
		    	<groupId>org.slf4j</groupId>
			       <artifactId>slf4j-log4j12</artifactId>
		         </exclusion>
	        </exclusions>
        </dependency> 
         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.2</version>
        </dependency>
         <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency> 
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>

二 最后计算是成功了, 但是因为序列化原因一直显示不出结果,所有结果都是0

下面是能用序列化方法

GenericDeserializer.java

package cloud.stream.serdes;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

//反序列化实现

public class GenericDeserializer<T> implements Deserializer<T> {

    private ObjectMapper objectMapper = new ObjectMapper();
    private Class<T>     type;
    /**
     * Default constructor needed by Kafka
     */

    public GenericDeserializer() {
	}

	@SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
    	type = (Class<T>) props.get("JsonPOJOClass");
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
        if (bytes == null)
            return null;

        T data;
        try {
            data = objectMapper.readValue(bytes, type);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    @Override
    public void close() {

    }


}

GenericSerializer.java

 

package cloud.stream.serdes;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

//序列化实现

public class GenericSerializer<T> implements Serializer<T> {

    private ObjectMapper objectMapper = new ObjectMapper();

  // public GenericSerializer(Class<T> pojoClass) {
    public GenericSerializer() {
	}

	@Override
    public void configure(Map<String, ?> props, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null)
            return null;

        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error serializing JSON message", e);
        }
    }

    @Override
    public void close() {
    }


}

SerdesFactory.java

 

package cloud.stream.serdes;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import cloud.stream.model.Statistics;

public class SerdesFactory {
	 /**
     * @param <T> The class should have a constructor without any
     *        arguments and have setter and getter for every member variable
     * @param pojoClass POJO class.
     * @return Instance of {@link Serde}
     * 
     * 序列化和反序列化能用方法,
     */
    public static <T> Serde<T> serdFrom(Class<T> pojoClass) {
    	   Map<String, Object> serdeProps = new HashMap<>();
    	 final Serializer<Statistics> statisticsSerializer = new GenericSerializer<>();
         serdeProps.put("JsonPOJOClass", pojoClass);
         statisticsSerializer.configure(serdeProps, false);

         final Deserializer<Statistics> statisticsDeserializer = new GenericDeserializer<>();
         serdeProps.put("JsonPOJOClass", pojoClass);
         statisticsDeserializer.configure(serdeProps, false);

    	 // return Serdes.serdeFrom(new GenericSerializer<T>(pojoClass), new GenericDeserializer<T>(pojoClass));
    	 return (Serde<T>) Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);
    }
}

下面是主程序代码

 


 * 统计60秒内,温度值的最大值  topic中的消息格式为数字,30, 21或者{"temp":19, "humidity": 25}
 */
public class TemperatureAvgDemo {
	 private static final int TEMPERATURE_WINDOW_SIZE = 60;

	    public static void main(String[] args) throws Exception {

	        Properties props = new Properties();
	        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temp-avg");
	        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "itcast:9092");
	        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
	        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

	        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
	        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
	        
	        //序列化和反序列化
	        final Serde<Statistics> statisticsSerde =SerdesFactory.serdFrom( Statistics.class);

	        StreamsBuilder builder = new StreamsBuilder();

	        KStream<String, String> source = builder.stream("snmp-temp1");
	        KTable<Windowed<String>, Statistics> max = source
	                .selectKey(new KeyValueMapper<String, String, String>() {
	                    @Override
	                    public String apply(String key, String value) {
	                        return "stat";
	                    }
	                })
	                .groupByKey()
	                .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
	                .aggregate(
	                        new Initializer<Statistics>() {
	                            @Override
	                            public Statistics apply() {
	                                Statistics avgAndSum = new Statistics(0L,0L,0L);
	                                return avgAndSum;
	                            }
	                        },
	                        new Aggregator<String, String, Statistics>() {
	                            @Override
	                            public Statistics apply(String aggKey, String newValue, Statistics aggValue) {
	                                //topic中的消息格式为{"temp":19, "humidity": 25}
	                                System.out.println("aggKey:" + aggKey + ",  newValue:" + newValue + ", aggKey:" + aggValue);
	                                Long newValueLong = null;
	                                try {
	                                    JSONObject json = JSON.parseObject(newValue);
	                                    newValueLong = json.getLong("temp");
	                                }
	                                catch (ClassCastException ex) {
	                                     newValueLong = Long.valueOf(newValue);
	                                }

	                                aggValue.setCount(aggValue.getCount() + 1);
	                                aggValue.setSum(aggValue.getSum() + newValueLong);
	                                aggValue.setAvg(aggValue.getSum() / aggValue.getCount());

	                                return aggValue;
	                            }
	                        },
	                        Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
	                                .withValueSerde(statisticsSerde)
	                );

	        WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
	        WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
	        Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

	        max.toStream().to("reulst-temp-stat", Produced.with(windowedSerde, statisticsSerde));

	        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
	        final CountDownLatch latch = new CountDownLatch(1);


	        Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") {
	            @Override
	            public void run() {
	                streams.close();
	                latch.countDown();
	            }
	        });

	        try {
	            streams.start();
	            latch.await();
	        } catch (Throwable e) {
	            System.exit(1);
	        }
	        System.exit(0);
	    }
	
}

最终输出结果

kafka-stream流式处理示例

结果输出情况

kafka-stream流式处理示例

 

相关标签: kafka-stream stream