Windows下IDEA远程调试Spark Streaming
文章目录
前言
本来半年前就应该发出来了,结果一拖就拖到了现在,真!是!决!定!了!就!要!立!即!去!做!啊!
Spark版本:2.1.2
Kafka版本:1.0.0
Linux系统:CentOS6.9
场景:
做Spark Streaming开发,在Win7下使用IDE进行开发,希望在本地IDEA上远程连接服务器上的大数据集群进行调试,这里仅连服务器上的Kafka和Zookeeper服务
- 我这里在Windows下使用IDEA进行开发,相信本文章对Eclipse的朋友也有帮助
- 服务器上的大数据集群已经全部安装好了(Hadoop、Hive、HBase、Spark、Flume、Sqoop、Zookeeper、Hue等等)
- 这里Kafka的安装、Spark的安装、以及其他大数据服务的安装不再赘述
- Spark不用安装,在IDEA里跑local模式不需要安装Spark
Kafka的安装可以我的这篇博客:
Centos安装配置kafka1.0.0
ClouderaManager的安装可以参考我的这篇博客:
Centos6离线安装CDH5.14.2最全详细教程-1前言
当然这个比较重量级,除了Kafka要是你不需要其他大数据服务的话,你只需要再装一个Zookeeper即可,单独安装Zookeeper就去网上再搜下教程吧.
第一步:后台启动Kafka
nohup bin/kafka-server-start.sh config/server.properties &
第二步:创建Kafka Topic
如果已经创建的话跳过
bin/kafka-topics.sh --create --zookeeper cm02.spark.com:2181 --replication-factor 1 --partitions 1 --topic test
第三步:启动Kafka的生产者
(cm02.spark.com为集群中一台服务器的主机名(映射了IP地址))
bin/kafka-console-producer.sh --broker-list cm02.spark.com:9092 --topic test
第四步:一个简单的Demo
需要添加的maven依赖jar包:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.2</version>
</dependency>
一个简单的Spark Streaming Demo
import org.apache.kafka.common.serialization.{IntegerDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Demo
* Spark Streaming远程连接Zookeeper读取Kafka的数据进行单词统计
* Created by CaoWeiDong on 2017-11-22.
*/
object ScalaKafkaWordCount {
def main(args: Array[String]): Unit = {
//Zookeeper连接地址
val zkQuorum = "n4:9092"
//Kafka连接信息(Topic Partition)
val topics = Array("test")
val kafkaMap = Map[String, Object](
"bootstrap.servers" -> zkQuorum,
"key.deserializer" -> classOf[IntegerDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"session.timeout.ms" -> "30000"
)
//conf
val conf = new SparkConf()
.setAppName(ScalaKafkaWordCount.getClass.getSimpleName)
.setMaster("local[4]")
//SparkStreaming
val ssc = new StreamingContext(conf, Seconds(2))
val consumer = ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)
//SparkStreaming连接Kafka
val lines = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
consumer)
.map(_.value())
//以空格进行切分,统计单词个数
val wordCounts = lines.flatMap(_.split(" "))
.map(key => (key, 1L))
.reduceByKey(_ + _)
//打印
wordCounts.print()
//启动
ssc.start()
ssc.awaitTermination()
}
}
第五步:运行Demo
第六步:准备数据
Hadoop Spark Kafka Flume
Hive HBase Flink Sqoop
Mapreduce yarn hdfs
Spark Storm Kylin Sqoop HBase Hadoop
Kafka Kylin hdfs Flume
将上面的数据贴到Kafka的Producer
第七步:IDEA下Spark Streaming的运行结果
最后:总结
其实这在Windows下IDEA下Spark Streaming通过local模式进行调试最关键的一行代码是:
val conf = new SparkConf()
.setAppName(ScalaKafkaWordCount.getClass.getSimpleName)
.setMaster("local[4]")
原来的时候我local的进程数是1,虽然运行时一直没有报错,但是Kafka生产的数据一直消费不了
后来调成4了以后就可以了,听朋友说是因为Spark Streaming需要开启多个进程,一个进程接收数据,一个处理数据,一开始调成2不行,然后调成4就可以了
上一篇: MySQL ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using passw
下一篇: 89C52单片机中断与 I/O 控制
推荐阅读
-
Windows下IDEA远程调试Spark Streaming
-
windows下,IDEA中基于maven配置spark开发环境
-
idea远程调试spark的步骤讲解
-
使用windows下的Eclipse或者IDEA远程连接Linux的Hadoop并运行wordcount
-
Windows下IntelliJ IDEA远程连接服务器中Hadoop运行WordCount(详细版)
-
有哪位大哥在 IntelliJ idea 中用过 openshift 么,下怎么远程调试,谢谢
-
有哪位大哥在 IntelliJ idea 中用过 openshift 么,下怎么远程调试,谢谢
-
使用Intellij IDEA远程调试Spark程序
-
利用idea对spark程序进行远程提交和调试
-
idea远程调试 spark