SparkStreaming学习示例
程序员文章站
2022-07-06 14:14:46
...
随着云计算等相关技术的发展,实时计算分析越来越成为一个重要的议题,许多不同的领域都有着实时处理数据的需求,尤其是在物联网概念提出之后,海量的数据需要存储、处理并且提供实时或者近实时的分析。因此,许多的技术都在试图提供这种实时计算的能力。像Storm,Spark这样的技术框架已经成为市场的主导,尤其是集批处理,流处理、图计算、机器学习为一体的Spark框架更是得到了众多企业、高校、IT从业人员的追捧。相信很多人都用过Spark SQL,但是一提到Spark Streaming,相信很多人还是跟我刚接触Spark Streaming 一样,脑袋发懵,不知如何下手。下面提供一个小例子,供大家学习和参考。
1.Spark Streaming编程一般流程
本文以从kafka消息中间件中消费数据为例进行介绍。
1)添加依赖
Spark Streaming在Spark中是Spark core核心包的扩展,要想进行流数据处理,需要引入相关依赖包。本文是以maven工具构建的项目,因此需要在pom.xml文件中添加如下依赖。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency> -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
2)初始化执行环境:JavaStreamingContext
// 初始化执行环境
// 注意:此处setMaster不能写成local,否则没有线程接收输入流
SparkConf conf = new SparkConf().setAppName("SparkStreamingAPP").setMaster("local[4]");
JavaSparkContext context = new JavaSparkContext(conf);
JavaStreamingContext jsc = new JavaStreamingContext(context, Durations.seconds(5));
3)创建DStream,实现DStream的转化操作
Properties properties = Config.getConfig("consumer.properties");
// 设置kafka订阅相关参数
Collection<String> topics = Arrays.asList(properties.get("topics").toString().split(","));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// A list of host/port pairs to use for establishing the initial
// connection to the Kafka cluster.
// The client will make use of all servers irrespective of which servers
// are specified here for
// bootstrapping—this list only impacts the initial hosts used to
// discover the full set of servers.
// This list should be in the form host1:port1,host2:port2,....
// Since these servers are just used for the initial connection to
// discover the full cluster
// membership (which may change dynamically), this list need not contain
// the full set of servers (you may want more than one, though,
// in case a server is down).
kafkaParams.put("bootstrap.servers", properties.get("bootstrap.servers"));
// Deserializer class for key that implements the
// org.apache.kafka.common.serialization.Deserializer interface.
kafkaParams.put("key.deserializer", StringDeserializer.class);
// Deserializer class for value that implements the
// org.apache.kafka.common.serialization.Deserializer interface.
// kafkaParams.put("value.deserializer",
// properties.get("value.deserializer").getClass());
kafkaParams.put("value.deserializer", StringDeserializer.class);
// A unique string that identifies the consumer group this consumer
// belongs to. This property is required if the consumer
// uses either the group management functionality by using
// subscribe(topic) or the Kafka-based offset management strategy.
kafkaParams.put("group.id", properties.get("group.id"));
// 从kafka订阅生成流式数据
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> pairDStream = stream
.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<String, String>(record.key(), record.value());
}
});
pairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
public void call(JavaPairRDD<String, String> arg0) throws Exception {
System.out.println(arg0.collect());
}
});
4)启动
// 启动
jsc.start();
jsc.awaitTermination();
2 打包到集群环境运行或者本地运行
如果是打包到集群环境运行,则初始化执行环境的时候不用设置master,可以在运行脚本里边设置即可。
非yarn模式下可用如下脚本启动
#!/bin/bash
spark-submit --master spark://master:7077 --class KafkaSparkStreming.SparkStreaming.SparkStreamingConsumer kafkaSparkStreaming.jar
yarn模式下可用如下脚本启动
#!/bin/bash
spark-submit --master yarn --class KafkaSparkStreming.SparkStreaming.SparkStreamingConsumer kafkaSparkStreaming.jar
3 创建生产者,往kafka中写消息
为了支持上述程序的运行,还需要一个生产者,不断的往kafka中写入消息,以供SparkStreaming程序进行消费。相关信息如下:
maven依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
</dependencies>
/**
* Producer
*
*/
public class MyProducer {
public static void main(String[] args) {
Properties pro = Config.getConfig("producer.properties");
Producer<String, String> producer = new KafkaProducer<String, String>(pro);
for (int i = 0; i < 10000; i++) {
producer.send(new ProducerRecord<String, String>(pro.get("topic").toString(), "this is message " + i));
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
}
public class Config {
public static Properties getConfig(String configFilePath) {
Properties pro = new Properties();
ClassLoader cl = Config.class.getClassLoader();
if (null == cl) return null;
InputStream inputStream = cl.getResourceAsStream(configFilePath);
try {
pro.load(inputStream);
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
return pro;
}
}
#kafka configuration
bootstrap.servers = master:9092,slave1:9092,slave2:9092
#specify the key's serializer class
key.serializer = org.apache.kafka.common.serialization.StringSerializer
#specify the value's serializer class
value.serializer = org.apache.kafka.common.serialization.StringSerializer
# ack
acks = all
#specify the topic
topic = test
retries = 0
batch.size = 16384
linger.ms = 1
buffer.memory = 33554432
详细示例见:https://download.csdn.net/my/uploads。由于csdn官方上传资源时无法选择免积分下载,如有需要,请联系qq:144712031