欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Windows下IDEA远程调试Spark Streaming

程序员文章站 2024-02-21 20:44:22
...

前言

本来半年前就应该发出来了,结果一拖就拖到了现在,真!是!决!定!了!就!要!立!即!去!做!啊!
Spark版本:2.1.2
Kafka版本:1.0.0
Linux系统:CentOS6.9
场景:
做Spark Streaming开发,在Win7下使用IDE进行开发,希望在本地IDEA上远程连接服务器上的大数据集群进行调试,这里仅连服务器上的Kafka和Zookeeper服务

  • 我这里在Windows下使用IDEA进行开发,相信本文章对Eclipse的朋友也有帮助
  • 服务器上的大数据集群已经全部安装好了(Hadoop、Hive、HBase、Spark、Flume、Sqoop、Zookeeper、Hue等等)
  • 这里Kafka的安装、Spark的安装、以及其他大数据服务的安装不再赘述
  • Spark不用安装,在IDEA里跑local模式不需要安装Spark

Kafka的安装可以我的这篇博客:
Centos安装配置kafka1.0.0

ClouderaManager的安装可以参考我的这篇博客:
Centos6离线安装CDH5.14.2最全详细教程-1前言
当然这个比较重量级,除了Kafka要是你不需要其他大数据服务的话,你只需要再装一个Zookeeper即可,单独安装Zookeeper就去网上再搜下教程吧.

第一步:后台启动Kafka

nohup bin/kafka-server-start.sh config/server.properties &

第二步:创建Kafka Topic

如果已经创建的话跳过

bin/kafka-topics.sh --create --zookeeper cm02.spark.com:2181 --replication-factor 1 --partitions 1 --topic test

第三步:启动Kafka的生产者

(cm02.spark.com为集群中一台服务器的主机名(映射了IP地址))

bin/kafka-console-producer.sh --broker-list cm02.spark.com:9092 --topic test

第四步:一个简单的Demo

需要添加的maven依赖jar包:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.2</version>
</dependency>

一个简单的Spark Streaming Demo

import org.apache.kafka.common.serialization.{IntegerDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Demo
  * Spark Streaming远程连接Zookeeper读取Kafka的数据进行单词统计
  * Created by CaoWeiDong on 2017-11-22.
  */
object ScalaKafkaWordCount {
    def main(args: Array[String]): Unit = {
        //Zookeeper连接地址
        val zkQuorum = "n4:9092"
        //Kafka连接信息(Topic Partition)
        val topics = Array("test")
        val kafkaMap = Map[String, Object](
            "bootstrap.servers" -> zkQuorum,
            "key.deserializer" -> classOf[IntegerDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "test",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean),
            "session.timeout.ms" -> "30000"
        )
        //conf
        val conf = new SparkConf()
            .setAppName(ScalaKafkaWordCount.getClass.getSimpleName)
            .setMaster("local[4]")
        //SparkStreaming
        val ssc = new StreamingContext(conf, Seconds(2))
        val consumer = ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)
        //SparkStreaming连接Kafka
        val lines = KafkaUtils.createDirectStream(
            ssc,
            LocationStrategies.PreferConsistent,
            consumer)
            .map(_.value())
        //以空格进行切分,统计单词个数
        val wordCounts = lines.flatMap(_.split(" "))
            .map(key => (key, 1L))
            .reduceByKey(_ + _)

        //打印
        wordCounts.print()
        //启动
        ssc.start()
        ssc.awaitTermination()
    }
}

第五步:运行Demo

Windows下IDEA远程调试Spark Streaming

第六步:准备数据

Hadoop Spark Kafka Flume
Hive HBase Flink Sqoop
Mapreduce yarn hdfs
Spark Storm Kylin Sqoop HBase Hadoop
Kafka Kylin hdfs Flume

将上面的数据贴到Kafka的Producer
Windows下IDEA远程调试Spark Streaming

第七步:IDEA下Spark Streaming的运行结果

Windows下IDEA远程调试Spark Streaming

最后:总结

其实这在Windows下IDEA下Spark Streaming通过local模式进行调试最关键的一行代码是:

val conf = new SparkConf()
.setAppName(ScalaKafkaWordCount.getClass.getSimpleName)
.setMaster("local[4]")

原来的时候我local的进程数是1,虽然运行时一直没有报错,但是Kafka生产的数据一直消费不了
后来调成4了以后就可以了,听朋友说是因为Spark Streaming需要开启多个进程,一个进程接收数据,一个处理数据,一开始调成2不行,然后调成4就可以了