欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark 连kafka_2.10-0.10.0.0

程序员文章站 2022-07-13 12:05:37
...
包:
chill_2.11-0.8.0.jar
commons-collections-3.2.2.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-logging-1.1.3.jar
guava-14.0.1.jar
hadoop-auth-2.6.4.jar
hadoop-common-2.6.4.jar
jackson-annotations-2.6.5.jar
jackson-core-2.6.5.jar
jackson-databind-2.6.5.jar
jackson-module-paranamer-2.6.5.jar
jackson-module-scala_2.11-2.6.5.jar
javax.servlet-api-3.1.0.jar
javax.ws.rs-api-2.0.1.jar
jersey-container-servlet-core-2.22.2.jar
jersey-server-2.22.2.jar
json4s-ast_2.11-3.2.11.jar
kafka-clients-0.10.2.0.jar
kryo-shaded-3.0.3.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-3.1.2.jar
metrics-json-3.1.2.jar
netty-all-4.0.42.Final.jar
paranamer-2.3.jar
scala-library-2.11.8.jar
scala-library.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
spark-core_2.11-2.1.0.jar
spark-launcher_2.11-2.1.0.jar
spark-network-common_2.11-2.1.0.jar
spark-network-shuffle_2.11-2.1.0.jar
spark-streaming-kafka-0-10_2.10-2.1.0.jar
spark-streaming_2.11-2.1.0.jar
spark-unsafe_2.11-2.1.0.jar
xbean-asm5-shaded-4.4.jar

PS:下面这个写法支持JDK1.7,即非lambda表达式写法
-----------------------------------------------------------
import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;

import org.apache.spark.streaming.kafka010.LocationStrategies;


public class PrintDirectMsgDirect {
  public static void main(String[] args) {
    try {
      SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
      final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));

      String brokers = "localhost:9092";
      String topics = "test";
      Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
      Map<String, Object> kafkaP = new HashMap<>();
      kafkaP.put("metadata.broker.list", brokers);
      kafkaP.put("bootstrap.servers", brokers);
      kafkaP.put("group.id", "group1");
      kafkaP.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      kafkaP.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      kafkaP.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(
              jssc,
              LocationStrategies.PreferConsistent(),
              ConsumerStrategies.Subscribe(topicsSet, kafkaP)
      );

      lines.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<Object, Object>>>(){
        @Override
        public void call(JavaRDD<ConsumerRecord<Object, Object>> consumerRecordJavaRDD) throws Exception {
          consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<Object, Object>>(){
            @Override
            public void call(ConsumerRecord<Object, Object> objectObjectConsumerRecord) throws Exception {
              System.out.println(">>>>>>>>>>objectObjectConsumerRecord:"+objectObjectConsumerRecord.value()+"]");
            }
          });
        }
      });
//      lines.foreachRDD(rdd -> {
//        rdd.foreach(x -> {
//          System.out.println(">>>>>>>>>>>>>>>>x:" + x + "]");
//          System.out.println(">>>>>>>>>>>>>>>>x.value:" + x.value() + "]");
//        });
//      });
      jssc.start();
      jssc.awaitTermination();
      jssc.close();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
相关标签: kafka spark