spark 连kafka_2.10-0.10.0.0
程序员文章站
2022-07-13 12:05:37
...
包:
chill_2.11-0.8.0.jar
commons-collections-3.2.2.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-logging-1.1.3.jar
guava-14.0.1.jar
hadoop-auth-2.6.4.jar
hadoop-common-2.6.4.jar
jackson-annotations-2.6.5.jar
jackson-core-2.6.5.jar
jackson-databind-2.6.5.jar
jackson-module-paranamer-2.6.5.jar
jackson-module-scala_2.11-2.6.5.jar
javax.servlet-api-3.1.0.jar
javax.ws.rs-api-2.0.1.jar
jersey-container-servlet-core-2.22.2.jar
jersey-server-2.22.2.jar
json4s-ast_2.11-3.2.11.jar
kafka-clients-0.10.2.0.jar
kryo-shaded-3.0.3.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-3.1.2.jar
metrics-json-3.1.2.jar
netty-all-4.0.42.Final.jar
paranamer-2.3.jar
scala-library-2.11.8.jar
scala-library.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
spark-core_2.11-2.1.0.jar
spark-launcher_2.11-2.1.0.jar
spark-network-common_2.11-2.1.0.jar
spark-network-shuffle_2.11-2.1.0.jar
spark-streaming-kafka-0-10_2.10-2.1.0.jar
spark-streaming_2.11-2.1.0.jar
spark-unsafe_2.11-2.1.0.jar
xbean-asm5-shaded-4.4.jar
PS:下面这个写法支持JDK1.7,即非lambda表达式写法
-----------------------------------------------------------
import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
public class PrintDirectMsgDirect {
public static void main(String[] args) {
try {
SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
String brokers = "localhost:9092";
String topics = "test";
Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaP = new HashMap<>();
kafkaP.put("metadata.broker.list", brokers);
kafkaP.put("bootstrap.servers", brokers);
kafkaP.put("group.id", "group1");
kafkaP.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaP.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaP.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaP)
);
lines.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<Object, Object>>>(){
@Override
public void call(JavaRDD<ConsumerRecord<Object, Object>> consumerRecordJavaRDD) throws Exception {
consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<Object, Object>>(){
@Override
public void call(ConsumerRecord<Object, Object> objectObjectConsumerRecord) throws Exception {
System.out.println(">>>>>>>>>>objectObjectConsumerRecord:"+objectObjectConsumerRecord.value()+"]");
}
});
}
});
// lines.foreachRDD(rdd -> {
// rdd.foreach(x -> {
// System.out.println(">>>>>>>>>>>>>>>>x:" + x + "]");
// System.out.println(">>>>>>>>>>>>>>>>x.value:" + x.value() + "]");
// });
// });
jssc.start();
jssc.awaitTermination();
jssc.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
chill_2.11-0.8.0.jar
commons-collections-3.2.2.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-logging-1.1.3.jar
guava-14.0.1.jar
hadoop-auth-2.6.4.jar
hadoop-common-2.6.4.jar
jackson-annotations-2.6.5.jar
jackson-core-2.6.5.jar
jackson-databind-2.6.5.jar
jackson-module-paranamer-2.6.5.jar
jackson-module-scala_2.11-2.6.5.jar
javax.servlet-api-3.1.0.jar
javax.ws.rs-api-2.0.1.jar
jersey-container-servlet-core-2.22.2.jar
jersey-server-2.22.2.jar
json4s-ast_2.11-3.2.11.jar
kafka-clients-0.10.2.0.jar
kryo-shaded-3.0.3.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-3.1.2.jar
metrics-json-3.1.2.jar
netty-all-4.0.42.Final.jar
paranamer-2.3.jar
scala-library-2.11.8.jar
scala-library.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
spark-core_2.11-2.1.0.jar
spark-launcher_2.11-2.1.0.jar
spark-network-common_2.11-2.1.0.jar
spark-network-shuffle_2.11-2.1.0.jar
spark-streaming-kafka-0-10_2.10-2.1.0.jar
spark-streaming_2.11-2.1.0.jar
spark-unsafe_2.11-2.1.0.jar
xbean-asm5-shaded-4.4.jar
PS:下面这个写法支持JDK1.7,即非lambda表达式写法
-----------------------------------------------------------
import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
public class PrintDirectMsgDirect {
public static void main(String[] args) {
try {
SparkConf sparkConf = new SparkConf().setAppName("PrintDirectMsg").setMaster("local[2]");
final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
String brokers = "localhost:9092";
String topics = "test";
Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaP = new HashMap<>();
kafkaP.put("metadata.broker.list", brokers);
kafkaP.put("bootstrap.servers", brokers);
kafkaP.put("group.id", "group1");
kafkaP.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaP.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaP.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaP)
);
lines.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<Object, Object>>>(){
@Override
public void call(JavaRDD<ConsumerRecord<Object, Object>> consumerRecordJavaRDD) throws Exception {
consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<Object, Object>>(){
@Override
public void call(ConsumerRecord<Object, Object> objectObjectConsumerRecord) throws Exception {
System.out.println(">>>>>>>>>>objectObjectConsumerRecord:"+objectObjectConsumerRecord.value()+"]");
}
});
}
});
// lines.foreachRDD(rdd -> {
// rdd.foreach(x -> {
// System.out.println(">>>>>>>>>>>>>>>>x:" + x + "]");
// System.out.println(">>>>>>>>>>>>>>>>x.value:" + x.value() + "]");
// });
// });
jssc.start();
jssc.awaitTermination();
jssc.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}